Stream processing using streamer#

The module mpservice.streamer provides utilities for stream processing with threading or multiprocessing (or asyncio to a lesser degree).

An input data stream goes through a series of operations. The output from one operation becomes the input to the next operation. One or more “primary” operations are so heavy that they can benefit from concurrency via threading (if they are I/O bound) or multiprocessing (if they are CPU bound). The other operations are typically light weight, although important in their own right. These operations perform batching, unbatching, buffering, mapping (simple transformation), filtering, grouping, etc.

To fix terminology, we’ll call the main methods of the class Stream “operators” or “operations”. Each operator adds a “streamlet”. The behavior of a Stream object is embodied by its chain of streamlets, which is accessible via the public attribute Stream.streamlets (although there is little need to access it). “Consumption” of the stream entails “pulling” at the end of the last streamlet and, in a chain reaction, consequently pulls each data element through the entire series of streamlets or operators.

Introduction#

Let’s make up an I/O-bound operation which takes an input and produces an output. (Think calling a remote service with an input and wait to get a result.)

>>> from time import sleep
>>> from random import random
>>> def double(x):
...     sleep(random() * 0.1)
...     return x * 2

Suppose we have a long stream of input values we want to process. We feed this stream into a Stream object:

>>> from mpservice.streamer import Stream
>>> data_stream = Stream(range(100))

The input stream is often a list, but more generally, it can be any Iterable, possibly unlimited. Since double is an I/O-bound operation, let’s use multiple threads to speed up the processing of the input stream. For this purpose, we add a parmap() (i.e. “parallel map”) operator to the stream:

>>> data_stream.parmap(double, executor='thread', concurrency=8)  
<mpservice.streamer._streamer.Stream object at 0x7...>

This requests the function double to be run in 8 threads; they will collectively process the input stream. Adding the operator is just “setup”–nothing runs until we start to retrieve results. Later we’ll see that we can add more than one operator, and there are other types of operators. Because a Stream is an Iterator, “retrieving the results” usually amounts to iterating over it:

>>> total = 0
>>> for y in data_stream:
...     total += y
>>> total
9900

What is the expected result?

>>> sum((v*2 for v in range(100)))
9900

This confirms the result is correct.

Despite the concurrency in the operation, the order of the input elements is preserved. In other words, the output elements correspond to the input elements in order. Let’s verify:

>>> data_stream = Stream(range(100)).parmap(double, executor='thread', concurrency=8)
>>> for k, y in enumerate(data_stream):  
...     print(y, end='  ')  
...     if (k + 1) % 10 == 0:  
...         print('')  
... print('')  
0  2  4  6  8  10  12  14  16  18
20  22  24  26  28  30  32  34  36  38
40  42  44  46  48  50  52  54  56  58
60  62  64  66  68  70  72  74  76  78
80  82  84  86  88  90  92  94  96  98
100  102  104  106  108  110  112  114  116  118
120  122  124  126  128  130  132  134  136  138
140  142  144  146  148  150  152  154  156  158
160  162  164  166  168  170  172  174  176  178
180  182  184  186  188  190  192  194  196  198

Note that we had to re-create the streamer object because, after the first iteration, the stream was “consumed” and gone. Also note that we can either add an operator in a statement, or call it as a function, often in a “chained” fashion. (An operator method modifies self, but also returns self in the end, hence facilitating chained calls.)

Suppose we want to follow the heavy double operation by a shift to each element:

>>> def shift(x, amount):
...     return x + amount

This is quick and easy; we decided to do it “in-line” by map():

>>> data_stream = Stream(range(20))
>>> data_stream.parmap(double, executor='thread', concurrency=8)  
<mpservice.streamer._streamer.Stream object at 0x7...>
>>> data_stream.map(shift, amount=0.8)  
<mpservice.streamer._streamer.Stream object at 0x7...>
>>> for k, y in enumerate(data_stream):  
...     print(y, end='  ')  
...     if (k + 1) % 10 == 0:  
...         print('')  
... print('')  
0.8  2.8  4.8  6.8  8.8  10.8  12.8  14.8  16.8  18.8
20.8  22.8  24.8  26.8  28.8  30.8  32.8  34.8  36.8  38.8

The first three lines are equivalent to this one line:

>>> data_stream = Stream(range(20)).parmap(double, executor='thread', concurrency=8).map(shift, amount=0.8)

Operators#

Stream has many “operators”. They can be characterized in a few ways:

One-to-one (will not change the elements’ count or order):
One-to-one (will change the elements’ order, but not count):
Many-to-one (may shrink the stream):
One-to-many (may expand the stream):
Selection or filtering (may drop elements):
Concurrent (using threads or processes):
Read-only (will not change the elements):

All these methods preserve the order of the elements, with the only exception shuffle().

The operation in parmap is supposedly heavy and expensive. All the other operations are meant to be lightweight and simple.

These methods can be called either as a single statement, or in a “chained” fashion. They “set up”, or “add”, operators to the streamer. However, they do not start executing the operations.

The operators that have been added to a Stream will start once we start to “consume” the stream, that is, to retrieve elements of the stream. Compared to thinking “the operators start”, it’s more intuitive to think “the elements start to flow” through the operators in the order they have been added. The “consuming” methods are “pulling” at the end of the final operator.

There are several ways to consume the stream:

  • Iterate over the Stream object, because it implements __iter__().

  • Call the method collect() to get all the elements in a list—if you know there are not too many of them!

  • Call the method drain() to “finish off” the operations. This does not return the elements of the stream, but rather just the count of them. This is used when the final operator exists mainly for a side effect, such as saving things to a database.

The latter two methods are trivial applications of the first.

Additional utilities separate from the class Stream#

A few of the stream operators are exposed on the module, so that they can be used standalone. These include Batcher, Unbatcher, and Parmapper.

There is a module function tee(), which is analogous to the standard itertools.tee.

The class EagerBatcher is analogous to Batcher but has a timeout, which controls how long to wait before yielding an under-sized batch.

The classes IterableQueue, ProcessRunner and ProcessRunee are unrelated to Stream. They have their own use cases.

There are two helper functions fifo_stream() and async_fifo_stream() that can be useful independent of Stream. They implement a pattern that preserves element order in concurrent processing.

API reference#

class mpservice.streamer.Stream[source]#

Bases: Iterable[Elem]

The class Stream is the “entry-point” for the “streamer” utilities. User constructs a Stream object by passing an Iterable to it, then calls its methods to use it. Most of the methods return the object itself, facilitating calls in a “chained” fashion, like this:

s = Stream(...).map(...).filter(...).batch(...).parmap(...).ubatch(...)

However, these methods modify the object in-place, hence the above is equivalent to calling the methods one by one:

s = Stream(...)
s.map(...)
s.filter(...)
s.batch(...)
s.parmap(...)
s.unbatch(...)
__init__(instream: Iterable, /)[source]#
Parameters:
instream

The input stream of elements, possibly unlimited.

__iter__() Iterator[Elem][source]#
drain() int[source]#

Drain off the stream and return the number of elements processed.

This method is for the side effect: the entire stream has been processed by all the operations and results have been taken care of, for example, the final operation may have saved results in a database.

If you need more info about the processing, such as inspecting exceptions as they happen (if return_expections to parmap() is True), then don’t use this method. Instead, iterate the streamer yourself and do whatever you need to do.

collect() list[Elem][source]#

Return all the elements in a list.

Warning

Do not call this method on “big data”.

map(func: Callable[[T], Any], /, **kwargs) Self[source]#

Perform a simple transformation on each data element.

This operation happens “inline”–there is no other threads or processes used. For this reason, the method is for “light-weight” transforms.

This is a 1-to-1 transform from the input stream to the output stream. This method can neither add nor skip elements in the stream.

If the logic needs to keep some state or history info, then define a class and implement its __call__ method. For example, to print out every hundredth value for information:

class Peek:
    def __init__(self):
        self._count = 0

    def __call__(self, x):
        self._count += 1
        if self._count == 100:
            print(x)
            self._count = 0
        return x

obj.map(Peek())

(This functionality is already provided by peek().)

Parameters:
func

A function that takes a data element and returns a new value; the new values (which do not have to differ from the original) form the new stream going forward.

*kwargs

Additional keyword arguments to func, after the first argument, which is the data element.

filter(func: Callable[[T], bool], /, **kwargs) Self[source]#

Select data elements to keep in the stream according to the predicate func.

This method can be used to either “keep” or “drop” elements according to various conditions. If the logic needs to keep some state or history info, then define a class and implement its __call__ method. For example, to “drop the first 100 elements”:

class Drop:
    def __init__(self, n):
        self._n = n
        self._count = 0

    def __call__(self, x):
        self._count += 1
        if self._count <= self._n:
            return False
        return True

obj.filter(Drop(100))
Parameters:
func

A function that takes a data element and returns True or False to indicate the element should be kept in or dropped from the stream.

This function should not make changes to the input data element.

*kwargs

Additional keyword arguments to func, after the first argument, which is the data element.

filter_exceptions(drop_exc_types: type[BaseException] | tuple[type[BaseException], ...] | None = None, keep_exc_types: type[BaseException] | tuple[type[BaseException], ...] | None = None) Self[source]#

If a call to parmap() upstream has specified return_exceptions=True, then its output stream may contain Exception objects. Other methods such as map() may also deliberately capture and return Exception objects.

filter_exceptions determines which Exception objects in the stream should be dropped, kept, or raised.

While parmap() and other operators can choose to continue processing the stream by returning rather than raising exceptions, a subsequent filter_exceptions drops known types of exception objects so that the next operator does not receive Exception objects as inputs.

The default behavior (both drop_exc_types and keep_exc_types are None) is to raise any Exception object that is encountered.

A useful pattern is to specify one or a few known exception types to drop, and crash on any other unexpected exception.

Parameters:
drop_exc_types

These types of exceptions are dropped from the stream. These should be one or a few carefully identified exception types that you know can be safely ignored.

If None (the default) or () or [], no exception object is dropped.

To drop all exceptions, use Exception or even BaseException.

keep_exc_types

These types of exceptions are kept in the stream.

If None (the default), no exception object is kept.

To keep all exceptions, use Exception or even BaseException.

An exception object that is neither kept nor dropped will be raised.

Note

The members in keep_exc_types and drop_exc_types should be distinct. If there is any common member, then it is kept because the keep_exc_types condition is checked first.

peek(*, print_func: ~collections.abc.Callable[[str], None] | None = None, interval: int | float = 1, exc_types: ~collections.abc.Sequence[type[BaseException]] | None = <class 'BaseException'>, with_exc_tb: bool = True, prefix: str = '', suffix: str = '') Self[source]#

Take a peek at the data element before it continues in the stream.

This is implemented by map(), where the mapper function prints some info under certain conditions before returning the input value unchanged. If this does not do what you need, just create your own function to pass to map().

Parameters:
print_func

A function that will be used to print messages. This should take a str and return nothing.

The default is the built-in print. It’s often useful to pass in logging function such as logger.info.

interval

Print out the data element at this interval. The default is 1000, that is, print every 1000th elment.

If it is a float, then it must be between 0 and 1 open-open. This will be take as the (target) fraction of elements that are printed.

To print out every element, pass in 1.

To turn off the printouts by this condition, pass in None.

exc_types

If the element is an exception object of these types, print it out. This is regardless of the interval value.

If BaseException (the default), all exception objects are printed.

To turn off the printouts by this condition, pass in None or () or []. In this case, interval is the only creterion for determining whether an element should be printed, the element being an Exception or not.

with_exc_tb

If True, traceback, if available, will be printed when an Exception object is printed, the printing being determined by either interval or exc_types.

shuffle(buffer_size: int = 1000) Self[source]#

Shuffle the elements using a buffer as intermediate storage.

If buffer_size is >= the number of the elements, then the shuffling is fully random. Otherwise, the shuffling is somewhat “localized”.

head(n: int) Self[source]#

Take the first n elements and ignore the rest. If the entire stream has less than n elements, just take all of them.

This does not delegate to filter, because filter would need to walk through the entire stream, which is not needed for head.

tail(n: int) Self[source]#

Take the last n elements and ignore all the previous ones. If the entire stream has less than n elements, just take all of them.

Note

n data elements need to be kept in memory, hence n should not be “too large” for the typical size of the data elements.

groupby(key: Callable[[T], Any], /, **kwargs) Self[source]#

key takes a data element and outputs a value. Consecutive elements that have the same value of this output will be yielded as a generator.

Following this operator, every element in the output stream is a tuple with the key and the values (as a generator).

This function is similar to the standard itertools.groupby.

The output of key is usually a str or int or a tuple of a few strings or ints.

**kwargs are additional keyword arguments to key.

Examples

>>> data = ['atlas', 'apple', 'answer', 'bee', 'block', 'away', 'peter', 'question', 'plum', 'please']
>>> print(Stream(data).groupby(lambda x: x[0]).map(lambda x: list(x[1])).collect())
[['atlas', 'apple', 'answer'], ['bee', 'block'], ['away'], ['peter'], ['question'], ['plum', 'please']]
batch(batch_size: int) Self[source]#

Take elements from an input stream and bundle them up into batches up to a size limit, and produce the batches as lists.

The output batches are all of the specified size, except possibly the final batch. There is no ‘timeout’ logic to proceed eagerly with a partial batch. For efficiency, this requires the input stream to have a steady supply. If that is a concern, having a buffer() on the input stream prior to batch() may help.

Examples

>>> ss = Stream(range(10)).batch(3)
>>> print(ss.collect())
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
unbatch() Self[source]#

Turn a stream of lists into a stream of individual elements.

This is sometimes used to correspond with a previous batch(), but that is by no means a requirement. The only requirement is that the input elements are lists.

unbatch can be combined with map() to implement “expanding” a stream, like this:

>>> def explode(x):
...     if isinstance(x, int) and x > 0:
...         return [x] * x
...     return []
>>> ss = Stream([0, 1, 2, 'a', -1, 'b', 3, 4]).map(explode).unbatch()
>>> print(ss.collect())
[1, 2, 2, 3, 3, 3, 4, 4, 4, 4]

In fact, elements of the input stream do not have to be lists. They can be any Iterable. For example:

>>> def expand(n):
...     for _ in range(n):
...         yield n
>>>
>>> stream = Stream((1, 2, 4, 3, 0, 5)).map(expand).unbatch().collect()
>>> print(stream)
[1, 2, 2, 4, 4, 4, 4, 3, 3, 3, 5, 5, 5, 5, 5]
accumulate(func: ~collections.abc.Callable[[~typing.Any, ~mpservice.streamer._streamer.T], ~typing.Any], initializer: ~typing.Any = <object object>, **kwargs) Self[source]#

This method is like “cumulative sum”, but the operation is specified by func, hence does not need to be “sum”. If the last element in the output stream is x and the upcoming element in the input stream is y, then the next element in the output stream is

func(x, y, **kwargs)

If initializer is not provided, then the first element is output as is, and “accumulation” begins with the second element. Suppose the input stream is x0, x1, x2, …, then the output stream is

x0, func(x0, x1, **kwargs), func(func(x0, x1, **kwargs), x2), ...

If initializer is provided (any user-provided value, including None), then the first element of the output stream is

func(initializer, x0, **kwargs)

hence “accumulation” begins with the first element.

Note

In some other languages or libraries, accumulate takes a stream or sequence and returns a single value as the result. This method, in contrast, returns a value for each element in the stream. In fact, the implementation is a simple application of map().

Examples

>>> ss = Stream(range(7))
>>> print(ss.accumulate(lambda x, y: x + y, 3).collect())
[3, 4, 6, 9, 13, 18, 24]
buffer(maxsize: int) Self[source]#

Buffer is used to stabilize and improve the speed of data flow.

A buffer is useful following any operation that can not guarantee (almost) instant availability of output. A buffer allows its input to “pile up” when its downstream consumer is slow, so that data is available when the downstream does come to request data. The buffer evens out irregularities in the speeds of upstream production and downstream consumption.

maxsize is the size of the internal buffer.

parmap(func: Callable[[T], TT], /, *, concurrency: int | None = None, return_x: bool = False, return_exceptions: bool = False, **kwargs) Self[source]#

Parallel, or concurrent, counterpart of map().

New threads or processes are created to execute func. The function is applied on each element of the data stream and produces a new value, which forms the output stream.

Elements in the output stream are in the order of the input elements. In other words, the order of data elements is preserved.

The main difference between parmap() and map() is that the former executes the function concurrently in background threads or processes, whereas the latter executes a (simple) function in-line.

Parameters:
func

A worker function that takes a single input item as the first positional argument and produces a result. Additional keyword args can be passed in via **kwargs.

The main point of func does not have to be its output. It could rather be some side effect. For example, saving data in a database. In that case, the output may be None. Regardless, the output is yielded to be consumed by the next operator in the pipeline. A stream of Nones could be used in counting, for example.

concurrency

When func is sync, this is the max number of threads or processes created to run func. This is also the max number of concurrent calls to func that can be ongoing at any time.

When func is async, this is the max number of concurrent (i.e. ongoing at the same time) calls to func.

return_x

If True, output stream will contain tuples (x, y); if False, output stream will contain y only.

return_exceptions

If True, exceptions raised by func will be in the output stream as if they were regular results; if False, they will halt the operation and propagate.

Note that a True value does not absorb exceptions raised by previous operators in the pipeline; it is concerned about exceptions raised by func only.

class mpservice.streamer.Batcher[source]#

Bases: Iterable

See Stream.batch().

__init__(instream: Iterable, /, batch_size: int)[source]#
__iter__()[source]#
class mpservice.streamer.Unbatcher[source]#

Bases: Iterable

See Stream.unbatch(). This is comparable to the standard itertools.chain.from_iterable.

__init__(instream: Iterable, /)[source]#

The incoming stream consists of lists. This object “expands” or “flattens” the lists into a stream of individual elements. Usually, the output stream is “longer” than the input stream.

This may correspond to a “Batcher” operator upstream, but that is by no means a requirement.

__iter__()[source]#
class mpservice.streamer.Parmapper[source]#

Bases: Iterable

__init__(instream: Iterable, func: Callable[[T], TT], *, executor: Literal['thread', 'process'], concurrency: int | None = None, return_x: bool = False, return_exceptions: bool = False, preprocessor: Callable | None = None, executor_initializer=None, executor_init_args=(), parmapper_name='parmapper', **kwargs)[source]#
Parameters:
executor

Either ‘thread’ or ‘process’.

If executor is 'process', then func must be pickle-able, for example, it can’t be a lambda or a function defined within another function. The same caution applies to any parameter passed to func in kwargs.

kwargs

Named arguments to func, in addition to the first, positional argument, which is an element of instream.

__iter__()[source]#
class mpservice.streamer.EagerBatcher[source]#

Bases: Iterable

EagerBatcher collects items from the incoming stream towards a target batch size and yields the batches. For each batch, after getting the first item, it will yield the batch either it has collected enough items or has reached timeout. Note, the timer starts upon getting the first item, whereas getting the first item for a new batch may take however long.

__init__(instream, /, batch_size: int, timeout: float | None = None, endmarker=None)[source]#
__iter__()[source]#
mpservice.streamer.fifo_stream(instream: Iterable[T], func: Callable[Concatenate[T, ...], concurrent.futures.Future], *, name: str = 'fifo-stream-feeder-thread', capacity: int = 32, return_x: bool = False, return_exceptions: bool = False, preprocessor: Callable[[T], Any] = None, **kwargs) Iterator[TT | Exception] | Iterator[tuple[T, TT | Exception]][source]#

This is a helper function for preserving order of input/output elements during concurrent processing.

Parameters:
func

A function that returns a concurrent.futures.Future object. This function is often a wrapper of the submit method of a concurrent.futures.ThreadPoolExecutor or concurrent.futures.ProcessPoolExecutor, but see mpservice.mpserver.Server.stream for a flexible example.

func takes the data element of instream as the first positional argument, plus optional keyword args passed in via kwargs.

capacity

The max number of elements that have been fetched from instream but have not been yielded out with results yet. This number may be larger than the max number of concurrent executions of (the function behind) func. If func uses a ThreadPoolExecutor or ProcessPoolExecutor, this capacity does not need to be much larger than the pool size.

Although capacity has a default value, user is recommended to specify a value that is appropriate for their particular use case.

preprocessor

A function to be applied to each element of instream, the output of which is passed on to func. If exception is raised, the exception object becomes the result of that input element (and the input element does not get called by func). Whether the exception will propagate depends on the value of return_exceptions.

One can imagine two use cases for preprocessor. The first is a data validation “gate”. The function returns the input unchanged if it’s “valid”, or raises an exception otherwise. This short-circuits bad data and prevents them from applied by func (which could involve multiprocessing and pickling).

In the second use case, preprocessor extracts part of the data element as the real input to func. This is supposed to be used along with return_x=True; the original data element (rather than the output of preprocessor) is returned. By this mechanism, only part of the data element is acted upon by func, while the full element flows on to subsequent steps.

The application of this callable happens in the current process/thread. It may be a lambda function. Usually this callable should be light weight. It should not modify its input.

async mpservice.streamer.async_fifo_stream(instream: AsyncIterable[T], func: Callable[Concatenate[T, ...], Awaitable[asyncio.Future]], *, name: str = 'async-fifo-stream-feeder-task', capacity: int = 128, return_x: bool = False, return_exceptions: bool = False, preprocessor: Callable[[T], Any] = None, **kwargs) AsyncIterator[TT | Exception] | Iterator[tuple[T, TT | Exception]][source]#

Analogous to fifo_stream() except for using an async worker function in an async context.

mpservice.streamer.tee(instream: Iterable[Elem], n: int = 2, /, *, buffer_size: int = 256) tuple[Stream[Elem], ...][source]#

tee produces multiple (default 2) “copies” of the input data stream, to be used in different ways.

Suppose we have a data stream, on which we want to apply two different lines of operations, conceptually like this:

data = [...]
stream_1 = Stream(data).map(...).batch(...).parmap(...)...
stream_2 = stream(data).buffer(...).parmap(...).map(...)...

There are a few ways we can do this:

  1. Revise the code to apply multiple operations on each data element, that is, “merging” the two lines of operations. This will likely make the code more complex.

  2. Apply the first line of operations; then re-walk the data stream to apply the second line of operations. When feasible, this is simple and clean. However, this approach could be infeasible, for example, when walking the data stream is expensive. There are also stituations where it’s not possible to get the data stream a second time.

  3. Use the current function:

    data = [...]
    stream_1, stream_2 = teee(data, 2)
    

    The two streams can be applied the Stream operations freely and independently (after all, they are proper Stream objects), except for the final “trigger” operations __iter__(), collect(), and drain():

    stream_1.map(...).batch(...).parmap(...)...
    stream_2.buffer(...).parmap(...).map(...)...
    

    Upon this setup, we need to start “consuming” the two streams at once, letting them run concurrently, and they will also finish around the same time. The concurrent “trigger” needs to be in different threads. Suppose stream_1 is run for some side effect, hence we can simply drain it; for stream_2, on the other hand, suppose we need its outcoming elements. We may do this:

    with concurrent.futures.ThreadPoolExecutor() as pool:
        t1 = pool.submit(stream_1.drain)
        t2 = pool.submit(stream_2.collect)
        n = t1.result()
        output = t2.result()
    

The opposite of tee is the built-in zip.

Parameters:
buffer_size

Size of an internal buffer, in terms of the number of elements of instream it holds. Unless each element takes a lot of memory, it is recommended to use a large buffer size, such as the default 256.

Think of the buffer as a “moving window” on instream. The n “forks” produced by this function are iterating over instream independently subject to one constraint: at any moment, the next element to be obtained by each fork is an element in this window. Once the first element in the window has been obtained by all n forks, the window will advance by one element. (In detail, the oldest element in the window is dropped; the next element is obtained from instream and appended to the window as the newest element.)

This moving window gives the “forks” wiggle room in their consumption of the data—they do not have to be processing the same element at the same time; their respective concurrent processing may be slow at different elements. A larger buffer window reduces the need for the forks to wait for their slower peers (if the slowness is on random elements rather than on every element).