Flexible service using mpserver#

mpservice.mpserver provides classes that use multiprocessing to perform CPU-bound operations taking advantage of all the CPUs (i.e. cores) on the machine. Using threading to perform IO-bound operations is equally supported, although it was not the initial focus.

There are three levels of constructs.

  1. On the lowest level is Worker. This defines operations on a single input item or a batch of items in usual sync code. This is supposed to run in its own process (or thread) and use that single process (or thread) only. In other words, to keep things simple, the user-defined behavior of Worker should not launch processes or threads.

  2. On the middle level is Servlet. A basic form of Servlet arranges to execute a Worker in one or more processes (or threads). More advanced forms of Servlet arrange to executive multiple Servlets as a sequence or an ensemble, or select a Servlet (from a set of Servlets) to process a particular input element based on certain conditions.

  3. On the top level is Server (or AsyncServer). A Server handles interfacing with the outside world, while passing the “real work” to a Servlet and relays the latter’s result back to the outside world.

Example#

Let’s make up an interesting problem that involves several expensive steps that demand a lot of computing power. We decided to use multiprocessing to speed things up. First, define the few operations that will take place in separate processes:

from time import sleep
from mpservice.mpserver import Worker, make_worker, ProcessServlet, ThreadServlet, SequentialServlet, EnsembleServlet


class GetHead(Worker):
    def call(self, x):
        sleep(0.01)
        return x[0]


class GetTail(Worker):
    def call(self, x):
        sleep(0.011)
        return x[-1]


class GetLen(Worker):
    def call(self, x):
        sleep(0.012)
        return len(x)


class Solute(Worker):
    def call(self, x):
        return f"Hello, {x}!"

This is what they do:

  • Given a sequence, GetHead returns the first element, GetTail returns the last element, GetLen returns the length of the sequence.

  • Given something, Solute returns a welcome message.

Second, specify how these operations work together. In other words, define a flow that any input value will go through.

servlet = SequentialServlet(
    EnsembleServlet(
        ProcessServlet(GetHead, cpus=[0]),
        ProcessServlet(GetTail, cpus=[0, 1]),
        ProcessServlet(GetLen, cpus=[1]),
        ),
    ThreadServlet(make_worker(lambda x: (x[0] + x[1]) * x[2])),
    ProcessServlet(Solute),
    )

In words, given input x, it goes through such a flow of operations:

  1. On the high level, the flow is a “sequence” of three components. The first is an EnsembleServlet, the second is a ThreadServlet, and the third is a ProcessServlet.

    The input x enters the EnsembleServlet, the output of which enters the ThreadServlet, the output of which enters the ProcessServlet, the output of which is the final result.

  2. The EnsembleServlet arranges GetHead, GetTail, and GetLen to run in separate processes because they are compute-intensive.

    For precise control, we have specified which CPUs each component should run on. This also shows how many processes each component creates and runs in. For example, GetTail uses two processes running on the first and the second CPU, respectively.

    Being an ensemble servlet, each of its components GetHead, GetTail, and GetLen will get the input x and produce its output. The three outputs will form a list (respecting the order of three operators), which is the output of the EnsembleServlet. In other words, the output is the list [first_elem, last_elem, len].

  3. On the output of the EnsembleServlet, we apply a simple function, which adds up the first two elements and multiply the sum by the third element. This is a light-weight operation, so we do it in a thread instead of a process.

    We could have defined a subclass of Worker and then wrap it in a ThreadServlet. For demonstration, we chose to use make_worker() to dynamically define and return such a class.

  4. The output of the ThreadServlet becomes the input to Solute. This is again a heavy computation, hence we run it in another process. This process can use any CPU because we did not provide the cpus argument.

    The output of Solute is the final result of the SequentialServlet.

All the Worker, ProcessServlet, ThreadServlet, EnsembleServlet, and SequentialServlet (and SwitchServlet not shown above) are just “spec” of the flow. They do not run by themselves. There needs to be a “driver” that starts them, connects them to the “outside world”, passes input to them, and collects output from them. That’s the job of a Server. To be precise, a Server does not interact with all of “them”; it directly interacts with only one Servlet; in the example above, that’s the SequentialServlet:

from mpservice.mpserver import Server

server = Server(servlet)

All this code is in a script named “test.py”. Here’s the remaining content of the script:

def main():

    with server:
        x = 'world'
        y = server.call(x)
        print(y)

        x = [1, 2, 3, 4]
        y = server.call(x)
        print(y)


if __name__ == '__main__':
    main()

Before continuing, can you figure out what will be printed?

Workers#

class mpservice.mpserver.Worker[source]#

Bases: object

Worker defines operations on a single input item or a batch of items in usual synchronous code. This is supposed to run in its own process (or thread) and use that single process (or thread) only.

Typically a subclass needs to enhance __init__() and implement call(), and leave the other methods intact.

A Worker object is not created and used by itself. It is always started by a ProcessServlet or a ThreadServlet.

classmethod run(*, q_in: _SimpleProcessQueue | _SimpleThreadQueue, q_out: _SimpleProcessQueue | _SimpleThreadQueue, **init_kwargs)[source]#

A Servlet object will arrange to start a Worker object in a thread or process. This classmethod will be the target argument to Thread or Process.

This method creates a Worker object and calls its start() method to kick off the work.

Parameters:
q_in

A queue that carries input elements to be processed.

If used in a ProcessServlet, q_in is a _SimpleProcessQueue. If used in a ThreadServlet, q_in is either a _SimpleProcessQueue or a _SimpleThreadQueue.

q_out

A queue that carries output values.

If used in a ProcessServlet, q_out is a _SimpleProcessQueue. If used in a ThreadServlet, q_out is either a _SimpleProcessQueue or a _SimpleThreadQueue.

The elements in q_out are results for each individual element in q_in. “Batching” is an internal optimization for speed; q_out does not contain result batches, but rather results of individuals.

**init_kwargs

Passed on to __init__().

If the worker is going to run in a child process, then elements in **kwargs go through pickling, hence they should consist mainly of small, Python builtin types such as string, number, small dict’s, etc. Be careful about passing custom class objects in **kwargs.

__init__(*, worker_index: int, batch_size: int | None = None, batch_wait_time: float | None = None, cpu_affinity: int | list[int] | None = None)[source]#

The main concern here is to set up controls for “batching” via the two parameters batch_size and batch_wait_time.

If the algorithm can not vectorize the computation, then there is no advantage in enabling batching. In that case, the subclass should simply fix batch_size to 0 in their __init__ and invoke super().__init__ accordingly.

The __init__ of a subclass may define additional input parameters; they can be passed in through run().

Parameters:
worker_index

0-based sequential number of the worker in a “servlet”. A subclass may use this to distinguish the worker processes/threads in the same Servlet and give them some different treatments, although they do essentially the same thing. For example, let each worker use one particular GPU.

This argument is provided in Servlet.start() when starting the worker. A subclass does not worry about providing this argument; it simply uses it if needed. The parameter has a proper value in __init__ and continues to be available as an instance attribute.

batch_size

Max batch size; see call().

Remember to pass in batch_size in accordance with the implementation of call(). In other words, if batch_size > 0, then call() must handle a list input that contains a batch of elements. On the other hand, if batch_size is 0, then the input to call() is a single element.

If None, then 0 is used, meaning no batching.

If batch_size=1, then processing is batched in form without speed benefits of batching.

batch_wait_time

Seconds, may be 0; the total duration to wait for one batch after the first item has arrived.

For example, suppose batch_size is 100 and batch_wait_time is 1. After the first item has arrived, if at least 99 items arrive within 1 second, then a batch of 100 elements will be produced; if less than 99 elements arrive within 1 second, then the wait will stop at 1 second, hence a batch of less than 100 elements will be produced; the batch could have only one element.

If 0, then there’s no wait. After the first element is obtained, if there are more elements in q_in “right there right now”, they will be retrieved until a batch of batch_size elements is produced. Any moment when q_in is empty, the collection will stop, and the elements collected so far (less than batch_size count of them) will make a batch. In other words, batching happens only for items that are already “piled up” in q_in at the moment.

To leverage batching, it is recommended to set batch_wait_time to a small positive value. Small, so that there is not much futile waiting. Positive (as opposed to 0), so that it always waits a little bit just in case more elements are coming in.

When batch_wait_time > 0, it will hurt performance during sequential calls (i.e. send a request with a single element, wait for the result, then send the next, and so on), because this worker will always wait for this long for additional items to come and form a batch, yet additional items will never come during sequential calls. However, when batching is enabled, sequential calls are not the intended use case. Beware of this factor in benchmarking.

If batch_size is 0 or 1, then batch_wait_time should be left unspecified, otherwise the only valid value is 0.

If batch_size > 1, then batch_wait_time is 0.01 by default.

cpu_affinity

Which CPUs this worker process is going to be “pinned” to. If None, no pinning.

If this worker is used in a ThreadServlet, this parameter is not specified in the call, and its value is None and is unused.

preprocess: Callable[[Any], Any]#

If a subclass has a method preprocess or an attribute preprocess that is a free-standing function, this method or function must take one data element (not a batch) as the sole, positional argument. This processes/transforms the data, and the output is used in call(). If this function raises an exception, this element is not sent to call(); instead, the exception object is short-circuited to the output queue.

When self.batch_size > 1, if call() needs to take care of an element of the batch that might fail a pre-condition, it is tedious to properly assemble the “good” and “bad” elements to further processing or to output in right order. This preprocess mechanism helps to deal with that situation.

When a subclass is designed to do non-batching work, this attribute is not necessary, because the same concern can be handled in call() directly.

When self.preprocess is defined, it is used in _start_single() and _build_input_batches().

If a Server contains a single servlet, which uses this Worker, then the functionalities of this self.preprocess can be largely provided by the parameter preprocessor to Server.stream. In those case, there is no need for this self.preprocess.

call(x)[source]#

Private methods of this class wait on the input queue to gather “work orders”, send them to call() for processing, collect the outputs of call(), and put them in the output queue.

If self.batch_size == 0, then x is a single element, and this method returns result for x.

x is not an Exception or RemoteException object; such a value would have been routed to the outgoing pipe and not passed to this method. The same is true for elements of x when self.batch_size > 0.

If self.batch_size > 0 (including 1), then x is a list of input data elements, and this method returns a list (or Sequence) of results corresponding to the elements in x. However, this output, when received by private methods of this class, will be split and individually put in the output queue, so that the elements in the output queue (q_out) correspond to the elements in the input queue (q_in), although vectorized computation, or batching, has happened internally.

When batching is enabled (i.e. when self.batch_size > 0), the number of elements in x varies between calls depending on the supply in the input queue. The list x does not have a fixed length.

Be sure to distinguish the case with batching (batch_size > 0) and the case w/o batching (batch_size = 0) where a single input is a list. In the latter case, the output of this method is the result corresponding to the single input x. The result could be anything—it may or may not be a list.

If a subclass fixes batch_size in its __init__ to be 0 or nonzero, make sure this method is implemented accordingly.

If __init__ does not fix the value of batch_size, then a particular instance may have been created with or without batching. In this case, this method needs to check self.batch_size and act accordingly,

If this method raises exceptions, unless the user has specific things to do, do not handle them; just let them happen. They will be handled in private methods of this class that call this method.

Usually this is the only method a subclass needs to customize. In rare cases, a subclass may want to customize stream() instead of or in addition to call().

stream(xx: Iterable) Iterator[source]#

xx is an iterable of input x to call(). (If self.batch_size > 0, then `xx is an iterable of batches.) This function yields the results of call() for the elements of xx, in the right order. If any invocation of call() raises an exception, the exception object is yielded.

The elements of xx (or elements of the elements of x when self.batch_size > 0) are not instances of Exception or RemoteException. Such values would have been routed to the outgoing pipe and not passed to this method.

The background loop in start() calls this method and does not call call() directly.

This method is provided mainly for the special use cases where a subclass wants to set self.num_stream_threads to a positive number, thereby use threading concurrency in this method.

If a subclass re-implements this method without calling call(), then call() does not need to be implemented, because this method is the only place of this class that calls call().

cleanup(exc=None)[source]#

This method is called when the object exits its service loop and stops. This is the place for cleanup code, e.g. releasing resources, exiting context managers (that have been entered in _init_()), etc.

start(*, q_in, q_out)[source]#

This is called by run() to kick off the processing loop.

To stop the processing, pass in the constant None through q_in.

mpservice.mpserver.make_worker(func: Callable[[Any], Any]) type[Worker][source]#

This function defines and returns a simple Worker subclass for quick, “on-the-fly” use. This can be useful when we want to introduce simple servlets for pre-processing and post-processing.

Parameters:
func

This function is what happens in the method call().

mpservice.mpserver.PassThrough = <class 'mpservice.mpserver._worker.PassThrough'>[source]#

Example use of this class:

def combine(x):
    '''
    Combine the ensemble elements depending on the results
    as well as the original input.
    '''
    x, *y = x
    assert len(y) == 3
    if x < 100:
        return sum(y) / len(y)
    else:
        return max(y)

s = EnsembleServlet(
        ThreadServlet(PassThrough),
        ProcessServlet(W1),
        ProcessServlet(W2)
        ProcessServlet(W3),
    )
ss = SequentialServlet(s, ThreadServlet(make_worker(combine)))

Servlets#

class mpservice.mpserver.Servlet[source]#

Bases: ABC

A Servlet manages the execution of one or more Worker. We make a distinction between “simple” servlets, including ProcessServlet and ThreadServlet, and “compound” servlets, including SequentialServlet, EnsembleServlet, and SwitchServlet.

A simple servlet arranges to execute one Worker in requested number of processes (or threads). Optionally, it can specify exactly which CPU(s) each worker process should use. Each input item is passed to and processed by exactly one of the processes (or threads).

A compound servlet arranges to execute multiple Servlets as a sequence or an ensemble. In addition, there is SwitchServlet that acts as a “switch” in front of a set of Servlets. There’s a flavor of recursion in this definition in that a member servlet can very well be a compound servlet.

Great power comes from this recursive definition. In principle, we can freely compose and nest the Servlet types. For example, suppose W1, W2,…, are Worker subclasses, then we may design such a workflow,

s = SequentialServlet(
        ProcessServlet(W1),
        EnsembleServlet(
            ThreadServlet(W2),
            SequentialServlet(ProcessServlet(W3), ThreadServlet(W4)),
            ),
        EnsembleServlet(
            Sequetial(ProcessServlet(W5), ProcessServlet(W6)),
            Sequetial(ProcessServlet(W7), ThreadServlet(W8), ProcessServlet(W9)),
            ),
    )
abstract start(q_in, q_out) None[source]#
abstract stop() None[source]#

When this method is called, there shouldn’t be “pending” work in the servlet. If for whatever reason, the servlet is in a busy, messy state, it is not required to “wait” for things to finish. The priority is to ensure the processes and threads are stopped.

The primary mechanism to stop a servlet is to put the special constant None in the input queue. The user should have done that; but just to be sure, this method may do that again.

abstract property input_queue_type: Literal['thread', 'process']#

Indicate whether the input queue can be a “thread queue” (i.e. queue.Queue) or needs to be a “process queue” (i.e. multiprocessing.queues.Queue). If “thread”, caller can provide either a thread queue or a process queue. If “process”, caller must provide a process queue.

abstract property output_queue_type: Literal['thread', 'process']#

Indicate whether the output queue can be a “thread queue” (i.e. queue.Queue) or needs to be a “process queue” (i.e. multiprocessing.queues.Queue). If “thread”, caller can provide either a thread queue or a process queue. If “process”, caller must provide a process queue.

abstract property workers: list[Worker]#
abstract property children: list#
class mpservice.mpserver.ProcessServlet[source]#

Bases: Servlet

Use this class if the operation is CPU bound.

__init__(worker_cls: type[Worker], *, cpus: None | int | Sequence[None | int | Sequence[int]] = None, worker_name: str | None = None, **kwargs)[source]#
Parameters:
worker_cls

A subclass of Worker.

cpus

Specifies how many processes to create and how they are pinned to specific CPUs.

The default is None, indicating a single unpinned process.

If an int, indicating number of (unpinned) processes.

Otherwise, a list specifiying CPU pinning. Each element of the list specifies the CPU pinning of one process. The number of processes created is the number of elements in cpus. The CPU spec is very flexible. For example,

cpus=[[0, 1, 2], [0], [2, 3], [4, 5, 6], 4, None]

This instructs the servlet to create 6 processes, each running an instance of worker_cls. The CPU affinity of each process is as follows:

  1. CPUs 0, 1, 2

  2. CPU 0

  3. CPUs 2, 3

  4. CPUs 4, 5, 6

  5. CPU 4

  6. Any CPU, no pinning

worker_name

Prefix to the name of the worker processes. If not provided, a default is constructed based on the class name.

**kwargs

Passed to the __init__ method of worker_cls.

Notes

When the servlet has multiple processes, the output stream does not need to follow the order of the elements in the input stream.

start(q_in: _SimpleProcessQueue, q_out: _SimpleProcessQueue)[source]#

Create the requested number of processes, in each starting an instance of self._worker_cls.

Parameters:
q_in

A queue with input elements. Each element will be passed to and processed by exactly one worker process.

q_out

A queue for results.

stop()[source]#

Stop the workers.

property input_queue_type#
property output_queue_type#
property workers#
property children#
class mpservice.mpserver.ThreadServlet[source]#

Bases: Servlet

Use this class if the operation is I/O bound (e.g. calling an external service to get some info), and computation is very light compared to the I/O part. Another use-case of this class is to perform some very simple and quick pre-processing or post-processing.

__init__(worker_cls: type[Worker], *, num_threads: int | None = None, worker_name: str | None = None, **kwargs)[source]#
Parameters:
worker_cls

A subclass of Worker

num_threads

The number of threads to create. Each thread will host and run an instance of worker_cls. Default None means 1.

worker_name

Prefix to the name of the worker threads. If not provided, a default is constructed based on the class name.

**kwargs

Passed on the __init__ method of worker_cls.

Notes

When the servlet has multiple threads, the output stream does not need to follow the order of the elements in the input stream.

start(q_in: _SimpleProcessQueue | _SimpleThreadQueue, q_out: _SimpleProcessQueue | _SimpleThreadQueue)[source]#

Create the requested number of threads, in each starting an instance of self._worker_cls.

Parameters:
q_in

A queue with input elements. Each element will be passed to and processed by exactly one worker thread.

q_out

A queue for results.

q_in and q_out are either _SimpleProcessQueues (for processes) or _SimpleThreadQueues (for threads). Because this servlet may be connected to either ProcessServlets or ThreadServlets, either type of queues may be appropriate. In contrast, for ProcessServlet, the input and output queues are both _SimpleProcessQueues.

stop()[source]#

Stop the worker threads.

property input_queue_type#
property output_queue_type#
property workers#
property children#
class mpservice.mpserver.SequentialServlet[source]#

Bases: Servlet

A SequentialServlet represents a sequence of operations performed in order, one operations’s output becoming the next operation’s input.

Each operation is performed by a “servlet”, that is, an instance of any subclass of Servlet, including SequentialServlet. However, a SequentialServlet as a direct member of another SequentialServlet may not be beneficial—you may as well flatten it out.

If any member servlet has multiple workers (threads or processes), the output stream does not need to follow the order of the elements in the input stream.

__init__(*servlets: Servlet)[source]#
start(q_in, q_out)[source]#

Start the member servlets.

A main concern is to connect the servlets by “pipes”, or queues, for input and output, in addition to the very first q_in, which carries input items from the “outside world” and the very last q_out, which sends output items to the “outside world”.

Each item in q_in goes to the first member servlet; the result goes to the second servlet; and so on. The result out of the last servlet goes to q_out.

The types of q_in and q_out are decided by the caller. The types of intermediate queues are decided within this function. As a rule, use _SimpleThreadQueue between two threads; use _SimpleProcessQueue between two processes or between a process and a thread.

stop()[source]#

Stop the member servlets.

property workers#
property children#
property input_queue_type#
property output_queue_type#
class mpservice.mpserver.EnsembleServlet[source]#

Bases: Servlet

A EnsembleServlet represents an ensemble of operations performed in parallel on each input item. The list of results, corresponding to the order of the operators, is returned as the result.

Each operation is performed by a “servlet”, that is, an instance of any subclass of Servlet.

If fail_fast is True (the default), once one ensemble member raises an Exception, an EnsembleError will be returned. Results of the other ensemble members that arrive afterwards will be ignored. This is not necessarily “fast”; the main point is that the item in question results in an Exception rather than a list that contains Exception(s).

If fail_fast is False, then Exception results, if any, are included in the result list. If all entries in the list are Exceptions, then the result list is replaced by an EnsembleError. Here is the logic: if the result list contains some valid results and some Exceptions, user may consider it “partially” successful and may choose to make use of the valid results in some way; if every ensemble member has failed, then the ensemble has failed, hence the result is a single EnsembleError.

The output stream does not need to follow the order of the elements in the input stream.

__init__(*servlets: Servlet, fail_fast: bool = True)[source]#
start(q_in, q_out)[source]#

Start the member servlets.

A main concern is to wire up the parallel execution of all the servlets on each input item.

q_in and q_out contain inputs from and outputs to the “outside world”. Their types, either _SimpleProcessQueue or _SimpleThreadQueue, are decided by the caller.

stop()[source]#
property workers#
property children#
property input_queue_type#
property output_queue_type#
class mpservice.mpserver.SwitchServlet[source]#

Bases: Servlet

SwitchServlet contains multiple member servlets (which are provided to __init__()). Each input element is passed to and processed by exactly one of the members based on the output of the method switch().

This is somewhat analogous to the “switch” construct in some programming languages.

__init__(*servlets: Servlet)[source]#
start(q_in, q_out)[source]#
stop()[source]#
abstract switch(x) int[source]#
Parameters:
x

An element received via the input queue, that is, the parameter q_in to start(). If the current servlet is preceded by another servlet, then x is an output of the other servlet.

In principle, this method should not modify x. It is expected to be a quick check on x to determine which member servlet should process it.

x is never an instance of Exception or RemoteException. That case is already taken care of before this method is called. This method should not be used to handle such exception cases.

Returns:
int

The index of the member servlet that will receive and process x. This number is between 0 and the number of member servlets minus 1, inclusive.

property input_queue_type#
property output_queue_type#
property workers#
property children#

Server#

class mpservice.mpserver.Server[source]#

Bases: object

The “interfacing” and “scheduling” code of Server runs in the “main process”. Two usage patterns are supported, namely making individual calls to the service to get individual results, or flowing a (potentially unlimited) stream of data through the service to get a stream of results.

A typical setup looks like this:

servlet = SequentialServlet(...)  # or other types of servlets
server = Server(servlet)
with server:
    z = server.call('abc')

    for x, y in server.stream(data, return_x=True):
        print(x, y)

Code in the “workers” (of servlet) should raise exceptions as it normally does, without handling them, if it considers the situation to be non-recoverable, e.g. input is of wrong type. The exceptions will be funneled through the pipelines and raised to the end-user with useful traceback info.

The user’s main work is implementing the operations in the “workers”. Another task (of some trial and error) by the user is experimenting with CPU allocations among workers to achieve best performance.

Server has an async counterpart named AsyncServer.

final classmethod get_mp_context()[source]#

If subclasses need to use additional Queues, Locks, Conditions, etc, they should create them out of this context. This returns a spawn context.

Subclasses should not customize this method.

__init__(servlet: Servlet, *, capacity: int = 256)[source]#
Parameters:
servlet

The servlet to run by this server.

The servlet has not “started”. Its start() will be called in __enter__().

capacity

Max number of requests concurrently in progress within this server, all pipes/servlets/stages combined.

For each request received, a UUID is assigned to it. An entry is added to an internal book-keeping dict. Then this ID along with the input data enter the processing pipeline. Coming out of the pipeline is the ID along with the result. The book-keeping record is found by the ID, used, and removed from the dict. Removal of finished requests from the book-keeping dict makes room for new requests. The capacity value is simply the size limit on this internal book-keeping dict.

property capacity: int#

The value of the parameter capacity to __init__().

property backlog: int#

The number of items currently being processed in the server. They may be in various stages.

__enter__()[source]#

The main operations conducted in this method include:

  • Start the servlet (hence creating and starting Worker instances).

  • Set up queues between the main thread (this “server” object) and the workers for passing input elements (and intermediate results) and results.

  • Set up background threads responsible for taking incoming calls and gathering/returning results. Roughly speaking, when call() is invoked, its input is handled by a thread, which places the input in a pipeline with appropriate book-keeping; it then waits for the result, which becomes available once another thread gathers the result from (another end of) the pipeline.

__exit__(*args)[source]#

The main operations conducted in this method include:

  • Place a special sentinel in the pipeline to indicate the end of operations; all Workers in the servlet will eventually see the sentinel and exit.

  • Wait for the servlet and all helper threads to exit.

call(x, /, *, timeout: int | float = 60, backpressure: bool = True)[source]#

Serve one request with input x, return the result.

This method is thread-safe, meaning it can be called from multiple threads concurrently.

If the operation failed due to ServerBacklogFull, TimeoutError, or any exception in any parts of the server, the exception is propagated.

Parameters:
x

Input data element.

timeout

In seconds. If result is not ready after this time, TimeoutError is raised.

There are two situations where timeout happens. At first, x is placed in an input queue for processing. This step is called “enqueue”. If the queue is full for the moment, the code will wait (if backpressure is False). If a spot does not become available during the timeout period, the ServerBacklogFull error message will be “… seconds enqueue”.

Effectively, the input queue is considered full if there are capacity count of ongoing (i.e. received but not yet finished) requests in the server in all stages combined, where capacity is a parameter to __init__().

Once x is placed in the input queue, code will wait for the result to come out at the end of an output queue. If result is not yet ready when the timeout period is over, the TimeoutError message will be “.. seconds total”. This waitout period includes the time that has been spent in the “enqueue” step, that is, the timer starts upon receiving the request, i.e. at the beginning of the function call.

backpressure

If True, and the input queue is full (that is, self.backlog == self.capacity), do not wait; raise ServerBacklogFull right away. If False, wait on the input queue for as long as timeout seconds.

The exception ServerBacklogFull may indicate an erronous situation—somehow the server is (almost) stuck and can not process requests as quickly as expected—or a valid situation—the server is getting requests faster than its expected load.

debug_info() dict[source]#

Return a dict with various pieces of info, about the status and health of the server, that may be helpful for debugging. Example content includes self.backlog, active child processes, status (alive/dead) of helper threads.

The content of the returned dict is str, int, or other types that are friendly to json.

stream(data_stream: Iterable, /, *, return_x: bool = False, return_exceptions: bool = False, timeout: int | float = 3600, preprocessor: Callable | None = None) Iterator[source]#

Use this method for high-throughput processing of a long stream of data elements. In theory, this method achieves the throughput upper-bound of the server, as it saturates the pipeline.

The order of elements in the stream is preserved, i.e., elements in the output stream correspond to elements in the input stream in the same order.

This method is thread-safe, that is, multiple threads can call this method concurrently with their respective input streams. The server will serve the requests interleaved as fast as it can. In that case, you may want to use return_exceptions=True in each of the calls so that one’s exception does not propagate and halt the program.

It is also fine to have calls to call() and stream() concurrently (from multiple threads, for example).

Parameters:
data_stream

An (possibly unlimited) iterable of input data elements.

return_x

If True, each output element is a length-two tuple containing the input data and the result. If False, each output element is just the result.

return_exceptions

If True, any Exception object will be produced in the output stream in place of the would-be regular result. If False, exceptions will be propagated right away, crashing the program.

timeout

Interpreted the same as in call().

In a streaming task, “timeout” is usually not a concern compared to overall throughput. You can usually leave it at the default value or make it even larger as needed.

preprocessor

See fifo_stream().

class mpservice.mpserver.AsyncServer[source]#

Bases: object

An AsyncServer object must be started in an async context manager. The primary methods call() and stream() are async.

Most concepts and usage are analogous to Server.

final classmethod get_mp_context()[source]#
__init__(servlet: Servlet, *, capacity: int = 256)[source]#
property capacity: int#
property backlog: int#
async __aenter__()[source]#
async __aexit__(*args)[source]#
async call(x, /, *, timeout: int | float = 60, backpressure: bool = True)[source]#

When this is called, this server is usually backing a (http or other) service using some async framework. Concurrent async calls to this method may happen.

See also

Server.call()

debug_info() dict[source]#
async stream(data_stream: AsyncIterable, /, *, return_x: bool = False, return_exceptions: bool = False, timeout: int | float = 3600, preprocessor: Callable | None = None) AsyncIterator[source]#

Calls to stream() and call() can happen at the same time (i.e. interleaved); multiple calls to stream() can also happen at the same time by different “users” (in the same thread).

See also

Server.stream()

exception mpservice.mpserver.ServerBacklogFull[source]#

Bases: RuntimeError

__init__(n, x=None)[source]#
exception mpservice.mpserver.TimeoutError[source]#

Bases: TimeoutError

Answer to the Example challenge: when we run the script, this is the printout:

$ python test.py
Hello, wdwdwdwdwd!
Hello, 20!