Stream processing using streamer#
The module mpservice.streamer provides utilities for stream processing with threading or multiprocessing (or asyncio to a lesser degree).
An input data stream goes through a series of operations. The output from one operation becomes the input to the next operation. One or more “primary” operations are so heavy that they can benefit from concurrency via threading (if they are I/O bound) or multiprocessing (if they are CPU bound). The other operations are typically light weight, although important in their own right. These operations perform batching, unbatching, buffering, mapping (simple transformation), filtering, grouping, etc.
To fix terminology, we’ll call the main methods of the class Stream “operators” or “operations”.
Each operator adds a “streamlet”. The behavior of a Stream object is embodied by its chain of
streamlets, which is accessible via the public attribute Stream.streamlets
(although there is little need to access it).
“Consumption” of the stream entails “pulling” at the end of the last streamlet and,
in a chain reaction, consequently pulls each data element through the entire series
of streamlets or operators.
Introduction#
Let’s make up an I/O-bound operation which takes an input and produces an output. (Think calling a remote service with an input and wait to get a result.)
>>> from time import sleep
>>> from random import random
>>> def double(x):
... sleep(random() * 0.1)
... return x * 2
Suppose we have a long stream of input values we want to process.
We feed this stream into a Stream object:
>>> from mpservice.streamer import Stream
>>> data_stream = Stream(range(100))
The input stream is often a list, but more generally, it can be any
Iterable, possibly unlimited.
Since double is an I/O-bound operation, let’s use multiple threads to speed up
the processing of the input stream.
For this purpose, we add a parmap() (i.e. “parallel map”) operator to the stream:
>>> data_stream.parmap(double, executor='thread', concurrency=8)
<mpservice.streamer._streamer.Stream object at 0x7...>
This requests the function double to be run in 8 threads;
they will collectively process the input stream.
Adding the operator is just “setup”–nothing runs until we start to retrieve results.
Later we’ll see that we can add more than one operator, and there are other types of operators.
Because a Stream is an Iterator,
“retrieving the results” usually amounts to iterating over it:
>>> total = 0
>>> for y in data_stream:
... total += y
>>> total
9900
What is the expected result?
>>> sum((v*2 for v in range(100)))
9900
This confirms the result is correct.
Despite the concurrency in the operation, the order of the input elements is preserved. In other words, the output elements correspond to the input elements in order. Let’s verify:
>>> data_stream = Stream(range(100)).parmap(double, executor='thread', concurrency=8)
>>> for k, y in enumerate(data_stream):
... print(y, end=' ')
... if (k + 1) % 10 == 0:
... print('')
... print('')
0 2 4 6 8 10 12 14 16 18
20 22 24 26 28 30 32 34 36 38
40 42 44 46 48 50 52 54 56 58
60 62 64 66 68 70 72 74 76 78
80 82 84 86 88 90 92 94 96 98
100 102 104 106 108 110 112 114 116 118
120 122 124 126 128 130 132 134 136 138
140 142 144 146 148 150 152 154 156 158
160 162 164 166 168 170 172 174 176 178
180 182 184 186 188 190 192 194 196 198
Note that we had to re-create the streamer object because,
after the first iteration, the stream was “consumed” and gone.
Also note that we can either add an operator in a statement, or call it as a function, often in a “chained” fashion.
(An operator method modifies self, but also returns self in the end, hence facilitating chained calls.)
Suppose we want to follow the heavy double operation by a shift to each element:
>>> def shift(x, amount):
... return x + amount
This is quick and easy; we decided to do it “in-line” by map():
>>> data_stream = Stream(range(20))
>>> data_stream.parmap(double, executor='thread', concurrency=8)
<mpservice.streamer._streamer.Stream object at 0x7...>
>>> data_stream.map(shift, amount=0.8)
<mpservice.streamer._streamer.Stream object at 0x7...>
>>> for k, y in enumerate(data_stream):
... print(y, end=' ')
... if (k + 1) % 10 == 0:
... print('')
... print('')
0.8 2.8 4.8 6.8 8.8 10.8 12.8 14.8 16.8 18.8
20.8 22.8 24.8 26.8 28.8 30.8 32.8 34.8 36.8 38.8
The first three lines are equivalent to this one line:
>>> data_stream = Stream(range(20)).parmap(double, executor='thread', concurrency=8).map(shift, amount=0.8)
Operators#
Stream has many “operators”. They can be characterized in a few ways:
- One-to-one (will not change the elements’ count or order):
- One-to-one (will change the elements’ order, but not count):
- Many-to-one (may shrink the stream):
- One-to-many (may expand the stream):
- Selection or filtering (may drop elements):
- Concurrent (using threads or processes):
- Read-only (will not change the elements):
All these methods preserve the order of the elements, with the only exception
shuffle().
The operation in parmap is supposedly heavy and expensive.
All the other operations are meant to be lightweight and simple.
These methods can be called either as a single statement, or in a “chained” fashion. They “set up”, or “add”, operators to the streamer. However, they do not start executing the operations.
The operators that have been added to a Stream will start once we start to “consume” the stream,
that is, to retrieve elements of the stream.
Compared to thinking “the operators start”, it’s more intuitive to think
“the elements start to flow” through the operators in the order they have been added.
The “consuming” methods are “pulling” at the end of the final operator.
There are several ways to consume the stream:
Iterate over the
Streamobject, because it implements__iter__().Call the method
collect()to get all the elements in a list—if you know there are not too many of them!Call the method
drain()to “finish off” the operations. This does not return the elements of the stream, but rather just the count of them. This is used when the final operator exists mainly for a side effect, such as saving things to a database.
The latter two methods are trivial applications of the first.
Additional utilities separate from the class Stream#
A few of the stream operators are exposed on the module, so that they can be used standalone.
These include Batcher, Unbatcher, and Parmapper.
There is a module function tee(), which is analogous to the standard
itertools.tee.
The class EagerBatcher is analogous to Batcher but has a timeout,
which controls how long to wait before yielding an under-sized batch.
The classes IterableQueue, ProcessRunner and ProcessRunee are unrelated to Stream.
They have their own use cases.
There are two helper functions fifo_stream() and async_fifo_stream() that can be useful independent of Stream.
They implement a pattern that preserves element order in concurrent processing.
API reference#
- class mpservice.streamer.Stream[source]#
Bases:
Iterable[Elem]The class
Streamis the “entry-point” for the “streamer” utilities. User constructs aStreamobject by passing an Iterable to it, then calls its methods to use it. Most of the methods return the object itself, facilitating calls in a “chained” fashion, like this:s = Stream(...).map(...).filter(...).batch(...).parmap(...).ubatch(...)
However, these methods modify the object in-place, hence the above is equivalent to calling the methods one by one:
s = Stream(...) s.map(...) s.filter(...) s.batch(...) s.parmap(...) s.unbatch(...)
- __init__(instream: Iterable, /)[source]#
- Parameters:
- instream
The input stream of elements, possibly unlimited.
- drain() int[source]#
Drain off the stream and return the number of elements processed.
This method is for the side effect: the entire stream has been processed by all the operations and results have been taken care of, for example, the final operation may have saved results in a database.
If you need more info about the processing, such as inspecting exceptions as they happen (if
return_expectionstoparmap()isTrue), then don’t use this method. Instead, iterate the streamer yourself and do whatever you need to do.
- collect() list[Elem][source]#
Return all the elements in a list.
Warning
Do not call this method on “big data”.
- map(func: Callable[[T], Any], /, **kwargs) Self[source]#
Perform a simple transformation on each data element.
This operation happens “inline”–there is no other threads or processes used. For this reason, the method is for “light-weight” transforms.
This is a 1-to-1 transform from the input stream to the output stream. This method can neither add nor skip elements in the stream.
If the logic needs to keep some state or history info, then define a class and implement its
__call__method. For example, to print out every hundredth value for information:class Peek: def __init__(self): self._count = 0 def __call__(self, x): self._count += 1 if self._count == 100: print(x) self._count = 0 return x obj.map(Peek())
(This functionality is already provided by
peek().)- Parameters:
- func
A function that takes a data element and returns a new value; the new values (which do not have to differ from the original) form the new stream going forward.
- *kwargs
Additional keyword arguments to
func, after the first argument, which is the data element.
- filter(func: Callable[[T], bool], /, **kwargs) Self[source]#
Select data elements to keep in the stream according to the predicate
func.This method can be used to either “keep” or “drop” elements according to various conditions. If the logic needs to keep some state or history info, then define a class and implement its
__call__method. For example, to “drop the first 100 elements”:class Drop: def __init__(self, n): self._n = n self._count = 0 def __call__(self, x): self._count += 1 if self._count <= self._n: return False return True obj.filter(Drop(100))
- Parameters:
- func
A function that takes a data element and returns
TrueorFalseto indicate the element should be kept in or dropped from the stream.This function should not make changes to the input data element.
- *kwargs
Additional keyword arguments to
func, after the first argument, which is the data element.
- filter_exceptions(drop_exc_types: type[BaseException] | tuple[type[BaseException], ...] | None = None, keep_exc_types: type[BaseException] | tuple[type[BaseException], ...] | None = None) Self[source]#
If a call to
parmap()upstream has specifiedreturn_exceptions=True, then its output stream may containExceptionobjects. Other methods such asmap()may also deliberately capture and return Exception objects.filter_exceptionsdetermines whichExceptionobjects in the stream should be dropped, kept, or raised.While
parmap()and other operators can choose to continue processing the stream by returning rather than raising exceptions, a subsequentfilter_exceptionsdrops known types of exception objects so that the next operator does not receiveExceptionobjects as inputs.The default behavior (both
drop_exc_typesandkeep_exc_typesareNone) is to raise any Exception object that is encountered.A useful pattern is to specify one or a few known exception types to drop, and crash on any other unexpected exception.
- Parameters:
- drop_exc_types
These types of exceptions are dropped from the stream. These should be one or a few carefully identified exception types that you know can be safely ignored.
If
None(the default) or()or[], no exception object is dropped.To drop all exceptions, use
Exceptionor evenBaseException.- keep_exc_types
These types of exceptions are kept in the stream.
If
None(the default), no exception object is kept.To keep all exceptions, use
Exceptionor evenBaseException.An exception object that is neither kept nor dropped will be raised.
Note
The members in
keep_exc_typesanddrop_exc_typesshould be distinct. If there is any common member, then it is kept because thekeep_exc_typescondition is checked first.
- peek(*, print_func: ~collections.abc.Callable[[str], None] | None = None, interval: int | float = 1, exc_types: ~collections.abc.Sequence[type[BaseException]] | None = <class 'BaseException'>, with_exc_tb: bool = True, prefix: str = '', suffix: str = '') Self[source]#
Take a peek at the data element before it continues in the stream.
This is implemented by
map(), where the mapper function prints some info under certain conditions before returning the input value unchanged. If this does not do what you need, just create your own function to pass tomap().- Parameters:
- print_func
A function that will be used to print messages. This should take a str and return nothing.
The default is the built-in
print. It’s often useful to pass in logging function such aslogger.info.- interval
Print out the data element at this interval. The default is 1000, that is, print every 1000th elment.
If it is a float, then it must be between 0 and 1 open-open. This will be take as the (target) fraction of elements that are printed.
To print out every element, pass in
1.To turn off the printouts by this condition, pass in
None.- exc_types
If the element is an exception object of these types, print it out. This is regardless of the
intervalvalue.If
BaseException(the default), all exception objects are printed.To turn off the printouts by this condition, pass in
Noneor()or[]. In this case,intervalis the only creterion for determining whether an element should be printed, the element being anExceptionor not.- with_exc_tb
If
True, traceback, if available, will be printed when anExceptionobject is printed, the printing being determined by eitherintervalorexc_types.
- shuffle(buffer_size: int = 1000) Self[source]#
Shuffle the elements using a buffer as intermediate storage.
If
buffer_sizeis>=the number of the elements, then the shuffling is fully random. Otherwise, the shuffling is somewhat “localized”.
- head(n: int) Self[source]#
Take the first
nelements and ignore the rest. If the entire stream has less thannelements, just take all of them.This does not delegate to
filter, becausefilterwould need to walk through the entire stream, which is not needed forhead.
- tail(n: int) Self[source]#
Take the last
nelements and ignore all the previous ones. If the entire stream has less thannelements, just take all of them.Note
ndata elements need to be kept in memory, hencenshould not be “too large” for the typical size of the data elements.
- groupby(key: Callable[[T], Any], /, **kwargs) Self[source]#
keytakes a data element and outputs a value. Consecutive elements that have the same value of this output will be yielded as a generator.Following this operator, every element in the output stream is a tuple with the key and the values (as a generator).
This function is similar to the standard itertools.groupby.
The output of
keyis usually a str or int or a tuple of a few strings or ints.**kwargsare additional keyword arguments tokey.Examples
>>> data = ['atlas', 'apple', 'answer', 'bee', 'block', 'away', 'peter', 'question', 'plum', 'please'] >>> print(Stream(data).groupby(lambda x: x[0]).map(lambda x: list(x[1])).collect()) [['atlas', 'apple', 'answer'], ['bee', 'block'], ['away'], ['peter'], ['question'], ['plum', 'please']]
- batch(batch_size: int) Self[source]#
Take elements from an input stream and bundle them up into batches up to a size limit, and produce the batches as lists.
The output batches are all of the specified size, except possibly the final batch. There is no ‘timeout’ logic to proceed eagerly with a partial batch. For efficiency, this requires the input stream to have a steady supply. If that is a concern, having a
buffer()on the input stream prior tobatch()may help.Examples
>>> ss = Stream(range(10)).batch(3) >>> print(ss.collect()) [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
- unbatch() Self[source]#
Turn a stream of lists into a stream of individual elements.
This is sometimes used to correspond with a previous
batch(), but that is by no means a requirement. The only requirement is that the input elements are lists.unbatchcan be combined withmap()to implement “expanding” a stream, like this:>>> def explode(x): ... if isinstance(x, int) and x > 0: ... return [x] * x ... return [] >>> ss = Stream([0, 1, 2, 'a', -1, 'b', 3, 4]).map(explode).unbatch() >>> print(ss.collect()) [1, 2, 2, 3, 3, 3, 4, 4, 4, 4]
In fact, elements of the input stream do not have to be
lists. They can be any Iterable. For example:>>> def expand(n): ... for _ in range(n): ... yield n >>> >>> stream = Stream((1, 2, 4, 3, 0, 5)).map(expand).unbatch().collect() >>> print(stream) [1, 2, 2, 4, 4, 4, 4, 3, 3, 3, 5, 5, 5, 5, 5]
- accumulate(func: ~collections.abc.Callable[[~typing.Any, ~mpservice.streamer._streamer.T], ~typing.Any], initializer: ~typing.Any = <object object>, **kwargs) Self[source]#
This method is like “cumulative sum”, but the operation is specified by
func, hence does not need to be “sum”. If the last element in the output stream isxand the upcoming element in the input stream isy, then the next element in the output stream isfunc(x, y, **kwargs)
If
initializeris not provided, then the first element is output as is, and “accumulation” begins with the second element. Suppose the input stream isx0,x1,x2, …, then the output stream isx0, func(x0, x1, **kwargs), func(func(x0, x1, **kwargs), x2), ...
If
initializeris provided (any user-provided value, includingNone), then the first element of the output stream isfunc(initializer, x0, **kwargs)
hence “accumulation” begins with the first element.
Note
In some other languages or libraries,
accumulatetakes a stream or sequence and returns a single value as the result. This method, in contrast, returns a value for each element in the stream. In fact, the implementation is a simple application ofmap().Examples
>>> ss = Stream(range(7)) >>> print(ss.accumulate(lambda x, y: x + y, 3).collect()) [3, 4, 6, 9, 13, 18, 24]
- buffer(maxsize: int) Self[source]#
Buffer is used to stabilize and improve the speed of data flow.
A buffer is useful following any operation that can not guarantee (almost) instant availability of output. A buffer allows its input to “pile up” when its downstream consumer is slow, so that data is available when the downstream does come to request data. The buffer evens out irregularities in the speeds of upstream production and downstream consumption.
maxsizeis the size of the internal buffer.
- parmap(func: Callable[[T], TT], /, *, concurrency: int | None = None, return_x: bool = False, return_exceptions: bool = False, **kwargs) Self[source]#
Parallel, or concurrent, counterpart of
map().New threads or processes are created to execute
func. The function is applied on each element of the data stream and produces a new value, which forms the output stream.Elements in the output stream are in the order of the input elements. In other words, the order of data elements is preserved.
The main difference between
parmap()andmap()is that the former executes the function concurrently in background threads or processes, whereas the latter executes a (simple) function in-line.- Parameters:
- func
A worker function that takes a single input item as the first positional argument and produces a result. Additional keyword args can be passed in via
**kwargs.The main point of
funcdoes not have to be its output. It could rather be some side effect. For example, saving data in a database. In that case, the output may beNone. Regardless, the output is yielded to be consumed by the next operator in the pipeline. A stream ofNones could be used in counting, for example.- concurrency
When
funcis sync, this is the max number of threads or processes created to runfunc. This is also the max number of concurrent calls tofuncthat can be ongoing at any time.When
funcis async, this is the max number of concurrent (i.e. ongoing at the same time) calls tofunc.- return_x
If
True, output stream will contain tuples(x, y); ifFalse, output stream will containyonly.- return_exceptions
If
True, exceptions raised byfuncwill be in the output stream as if they were regular results; ifFalse, they will halt the operation and propagate.Note that a
Truevalue does not absorb exceptions raised by previous operators in the pipeline; it is concerned about exceptions raised byfunconly.
- class mpservice.streamer.Batcher[source]#
Bases:
IterableSee
Stream.batch().
- class mpservice.streamer.Unbatcher[source]#
Bases:
IterableSee
Stream.unbatch(). This is comparable to the standarditertools.chain.from_iterable.- __init__(instream: Iterable, /)[source]#
The incoming stream consists of lists. This object “expands” or “flattens” the lists into a stream of individual elements. Usually, the output stream is “longer” than the input stream.
This may correspond to a “Batcher” operator upstream, but that is by no means a requirement.
- class mpservice.streamer.Parmapper[source]#
Bases:
Iterable- __init__(instream: Iterable, func: Callable[[T], TT], *, executor: Literal['thread', 'process'], concurrency: int | None = None, return_x: bool = False, return_exceptions: bool = False, preprocessor: Callable | None = None, executor_initializer=None, executor_init_args=(), parmapper_name='parmapper', **kwargs)[source]#
- Parameters:
- executor
Either ‘thread’ or ‘process’.
If
executoris'process', thenfuncmust be pickle-able, for example, it can’t be a lambda or a function defined within another function. The same caution applies to any parameter passed tofuncinkwargs.- kwargs
Named arguments to
func, in addition to the first, positional argument, which is an element ofinstream.
- class mpservice.streamer.EagerBatcher[source]#
Bases:
IterableEagerBatchercollects items from the incoming stream towards a target batch size and yields the batches. For each batch, after getting the first item, it will yield the batch either it has collected enough items or has reachedtimeout. Note, the timer starts upon getting the first item, whereas getting the first item for a new batch may take however long.
- mpservice.streamer.fifo_stream(instream: Iterable[T], func: Callable[Concatenate[T, ...], concurrent.futures.Future], *, name: str = 'fifo-stream-feeder-thread', capacity: int = 32, return_x: bool = False, return_exceptions: bool = False, preprocessor: Callable[[T], Any] = None, **kwargs) Iterator[TT | Exception] | Iterator[tuple[T, TT | Exception]][source]#
This is a helper function for preserving order of input/output elements during concurrent processing.
- Parameters:
- func
A function that returns a
concurrent.futures.Futureobject. This function is often a wrapper of thesubmitmethod of aconcurrent.futures.ThreadPoolExecutororconcurrent.futures.ProcessPoolExecutor, but seempservice.mpserver.Server.streamfor a flexible example.functakes the data element ofinstreamas the first positional argument, plus optional keyword args passed in viakwargs.- capacity
The max number of elements that have been fetched from
instreambut have not been yielded out with results yet. This number may be larger than the max number of concurrent executions of (the function behind)func. Iffuncuses aThreadPoolExecutororProcessPoolExecutor, this capacity does not need to be much larger than the pool size.Although
capacityhas a default value, user is recommended to specify a value that is appropriate for their particular use case.- preprocessor
A function to be applied to each element of instream, the output of which is passed on to
func. If exception is raised, the exception object becomes the result of that input element (and the input element does not get called byfunc). Whether the exception will propagate depends on the value of return_exceptions.One can imagine two use cases for
preprocessor. The first is a data validation “gate”. The function returns the input unchanged if it’s “valid”, or raises an exception otherwise. This short-circuits bad data and prevents them from applied byfunc(which could involve multiprocessing and pickling).In the second use case,
preprocessorextracts part of the data element as the real input tofunc. This is supposed to be used along withreturn_x=True; the original data element (rather than the output ofpreprocessor) is returned. By this mechanism, only part of the data element is acted upon byfunc, while the full element flows on to subsequent steps.The application of this callable happens in the current process/thread. It may be a lambda function. Usually this callable should be light weight. It should not modify its input.
- async mpservice.streamer.async_fifo_stream(instream: AsyncIterable[T], func: Callable[Concatenate[T, ...], Awaitable[asyncio.Future]], *, name: str = 'async-fifo-stream-feeder-task', capacity: int = 128, return_x: bool = False, return_exceptions: bool = False, preprocessor: Callable[[T], Any] = None, **kwargs) AsyncIterator[TT | Exception] | Iterator[tuple[T, TT | Exception]][source]#
Analogous to
fifo_stream()except for using an async worker function in an async context.
- mpservice.streamer.tee(instream: Iterable[Elem], n: int = 2, /, *, buffer_size: int = 256) tuple[Stream[Elem], ...][source]#
teeproduces multiple (default 2) “copies” of the input data stream, to be used in different ways.Suppose we have a data stream, on which we want to apply two different lines of operations, conceptually like this:
data = [...] stream_1 = Stream(data).map(...).batch(...).parmap(...)... stream_2 = stream(data).buffer(...).parmap(...).map(...)...
There are a few ways we can do this:
Revise the code to apply multiple operations on each data element, that is, “merging” the two lines of operations. This will likely make the code more complex.
Apply the first line of operations; then re-walk the data stream to apply the second line of operations. When feasible, this is simple and clean. However, this approach could be infeasible, for example, when walking the data stream is expensive. There are also stituations where it’s not possible to get the data stream a second time.
Use the current function:
data = [...] stream_1, stream_2 = teee(data, 2)
The two streams can be applied the
Streamoperations freely and independently (after all, they are properStreamobjects), except for the final “trigger” operations__iter__(),collect(), anddrain():stream_1.map(...).batch(...).parmap(...)... stream_2.buffer(...).parmap(...).map(...)...
Upon this setup, we need to start “consuming” the two streams at once, letting them run concurrently, and they will also finish around the same time. The concurrent “trigger” needs to be in different threads. Suppose
stream_1is run for some side effect, hence we can simplydrainit; forstream_2, on the other hand, suppose we need its outcoming elements. We may do this:with concurrent.futures.ThreadPoolExecutor() as pool: t1 = pool.submit(stream_1.drain) t2 = pool.submit(stream_2.collect) n = t1.result() output = t2.result()
The opposite of
teeis the built-inzip.- Parameters:
- buffer_size
Size of an internal buffer, in terms of the number of elements of
instreamit holds. Unless each element takes a lot of memory, it is recommended to use a large buffer size, such as the default 256.Think of the buffer as a “moving window” on
instream. Then“forks” produced by this function are iterating overinstreamindependently subject to one constraint: at any moment, the next element to be obtained by each fork is an element in this window. Once the first element in the window has been obtained by allnforks, the window will advance by one element. (In detail, the oldest element in the window is dropped; the next element is obtained frominstreamand appended to the window as the newest element.)This moving window gives the “forks” wiggle room in their consumption of the data—they do not have to be processing the same element at the same time; their respective concurrent processing may be slow at different elements. A larger buffer window reduces the need for the forks to wait for their slower peers (if the slowness is on random elements rather than on every element).