Flexible service using mpserver#
mpservice.mpserver provides classes that use multiprocessing to perform CPU-bound operations
taking advantage of all the CPUs (i.e. cores) on the machine.
Using threading to perform IO-bound operations is equally supported, although it was not the initial focus.
There are three levels of constructs.
On the lowest level is
Worker. This defines operations on a single input item or a batch of items in usual sync code. This is supposed to run in its own process (or thread) and use that single process (or thread) only. In other words, to keep things simple, the user-defined behavior ofWorkershould not launch processes or threads.On the middle level is
Servlet. A basic form of Servlet arranges to execute aWorkerin one or more processes (or threads). More advanced forms of Servlet arrange to executive multiple Servlets as a sequence or an ensemble, or select a Servlet (from a set of Servlets) to process a particular input element based on certain conditions.On the top level is
Server(orAsyncServer). A Server handles interfacing with the outside world, while passing the “real work” to aServletand relays the latter’s result back to the outside world.
Example#
Let’s make up an interesting problem that involves several expensive steps that demand a lot of computing power. We decided to use multiprocessing to speed things up. First, define the few operations that will take place in separate processes:
from time import sleep
from mpservice.mpserver import Worker, make_worker, ProcessServlet, ThreadServlet, SequentialServlet, EnsembleServlet
class GetHead(Worker):
def call(self, x):
sleep(0.01)
return x[0]
class GetTail(Worker):
def call(self, x):
sleep(0.011)
return x[-1]
class GetLen(Worker):
def call(self, x):
sleep(0.012)
return len(x)
class Solute(Worker):
def call(self, x):
return f"Hello, {x}!"
This is what they do:
Given a sequence,
GetHeadreturns the first element,GetTailreturns the last element,GetLenreturns the length of the sequence.Given something,
Solutereturns a welcome message.
Second, specify how these operations work together. In other words, define a flow that any input value will go through.
servlet = SequentialServlet(
EnsembleServlet(
ProcessServlet(GetHead, cpus=[0]),
ProcessServlet(GetTail, cpus=[0, 1]),
ProcessServlet(GetLen, cpus=[1]),
),
ThreadServlet(make_worker(lambda x: (x[0] + x[1]) * x[2])),
ProcessServlet(Solute),
)
In words, given input x, it goes through such a flow of operations:
On the high level, the flow is a “sequence” of three components. The first is an
EnsembleServlet, the second is aThreadServlet, and the third is aProcessServlet.The input
xenters the EnsembleServlet, the output of which enters the ThreadServlet, the output of which enters the ProcessServlet, the output of which is the final result.The
EnsembleServletarrangesGetHead,GetTail, andGetLento run in separate processes because they are compute-intensive.For precise control, we have specified which CPUs each component should run on. This also shows how many processes each component creates and runs in. For example,
GetTailuses two processes running on the first and the second CPU, respectively.Being an ensemble servlet, each of its components
GetHead,GetTail, andGetLenwill get the inputxand produce its output. The three outputs will form a list (respecting the order of three operators), which is the output of theEnsembleServlet. In other words, the output is the list[first_elem, last_elem, len].On the output of the EnsembleServlet, we apply a simple function, which adds up the first two elements and multiply the sum by the third element. This is a light-weight operation, so we do it in a thread instead of a process.
We could have defined a subclass of
Workerand then wrap it in aThreadServlet. For demonstration, we chose to usemake_worker()to dynamically define and return such a class.The output of the ThreadServlet becomes the input to
Solute. This is again a heavy computation, hence we run it in another process. This process can use any CPU because we did not provide thecpusargument.The output of
Soluteis the final result of theSequentialServlet.
All the Worker, ProcessServlet, ThreadServlet, EnsembleServlet,
and SequentialServlet (and SwitchServlet not shown above) are just “spec” of the flow.
They do not run by themselves.
There needs to be a “driver” that starts them, connects them to the “outside world”,
passes input to them, and collects output from them.
That’s the job of a Server.
To be precise, a Server does not interact with all of “them”;
it directly interacts with only one Servlet; in the example above, that’s
the SequentialServlet:
from mpservice.mpserver import Server
server = Server(servlet)
All this code is in a script named “test.py”. Here’s the remaining content of the script:
def main():
with server:
x = 'world'
y = server.call(x)
print(y)
x = [1, 2, 3, 4]
y = server.call(x)
print(y)
if __name__ == '__main__':
main()
Before continuing, can you figure out what will be printed?
Workers#
- class mpservice.mpserver.Worker[source]#
Bases:
objectWorkerdefines operations on a single input item or a batch of items in usual synchronous code. This is supposed to run in its own process (or thread) and use that single process (or thread) only.Typically a subclass needs to enhance
__init__()and implementcall(), and leave the other methods intact.A
Workerobject is not created and used by itself. It is always started by aProcessServletor aThreadServlet.- classmethod run(*, q_in: _SimpleProcessQueue | _SimpleThreadQueue, q_out: _SimpleProcessQueue | _SimpleThreadQueue, **init_kwargs)[source]#
A
Servletobject will arrange to start aWorkerobject in a thread or process. This classmethod will be thetargetargument to Thread or Process.This method creates a
Workerobject and calls itsstart()method to kick off the work.- Parameters:
- q_in
A queue that carries input elements to be processed.
If used in a
ProcessServlet,q_inis a_SimpleProcessQueue. If used in aThreadServlet,q_inis either a_SimpleProcessQueueor a_SimpleThreadQueue.- q_out
A queue that carries output values.
If used in a
ProcessServlet,q_outis a_SimpleProcessQueue. If used in aThreadServlet,q_outis either a_SimpleProcessQueueor a_SimpleThreadQueue.The elements in
q_outare results for each individual element inq_in. “Batching” is an internal optimization for speed;q_outdoes not contain result batches, but rather results of individuals.- **init_kwargs
Passed on to
__init__().If the worker is going to run in a child process, then elements in
**kwargsgo through pickling, hence they should consist mainly of small, Python builtin types such as string, number, small dict’s, etc. Be careful about passing custom class objects in**kwargs.
- __init__(*, worker_index: int, batch_size: int | None = None, batch_wait_time: float | None = None, cpu_affinity: int | list[int] | None = None)[source]#
The main concern here is to set up controls for “batching” via the two parameters
batch_sizeandbatch_wait_time.If the algorithm can not vectorize the computation, then there is no advantage in enabling batching. In that case, the subclass should simply fix
batch_sizeto 0 in their__init__and invokesuper().__init__accordingly.The
__init__of a subclass may define additional input parameters; they can be passed in throughrun().- Parameters:
- worker_index
0-based sequential number of the worker in a “servlet”. A subclass may use this to distinguish the worker processes/threads in the same Servlet and give them some different treatments, although they do essentially the same thing. For example, let each worker use one particular GPU.
This argument is provided in
Servlet.start()when starting the worker. A subclass does not worry about providing this argument; it simply uses it if needed. The parameter has a proper value in__init__and continues to be available as an instance attribute.- batch_size
Max batch size; see
call().Remember to pass in
batch_sizein accordance with the implementation ofcall(). In other words, ifbatch_size > 0, thencall()must handle a list input that contains a batch of elements. On the other hand, ifbatch_sizeis 0, then the input tocall()is a single element.If
None, then 0 is used, meaning no batching.If
batch_size=1, then processing is batched in form without speed benefits of batching.- batch_wait_time
Seconds, may be 0; the total duration to wait for one batch after the first item has arrived.
For example, suppose
batch_sizeis 100 andbatch_wait_timeis 1. After the first item has arrived, if at least 99 items arrive within 1 second, then a batch of 100 elements will be produced; if less than 99 elements arrive within 1 second, then the wait will stop at 1 second, hence a batch of less than 100 elements will be produced; the batch could have only one element.If 0, then there’s no wait. After the first element is obtained, if there are more elements in
q_in“right there right now”, they will be retrieved until a batch ofbatch_sizeelements is produced. Any moment whenq_inis empty, the collection will stop, and the elements collected so far (less thanbatch_sizecount of them) will make a batch. In other words, batching happens only for items that are already “piled up” inq_inat the moment.To leverage batching, it is recommended to set
batch_wait_timeto a small positive value. Small, so that there is not much futile waiting. Positive (as opposed to 0), so that it always waits a little bit just in case more elements are coming in.When
batch_wait_time > 0, it will hurt performance during sequential calls (i.e. send a request with a single element, wait for the result, then send the next, and so on), because this worker will always wait for this long for additional items to come and form a batch, yet additional items will never come during sequential calls. However, when batching is enabled, sequential calls are not the intended use case. Beware of this factor in benchmarking.If
batch_sizeis 0 or 1, thenbatch_wait_timeshould be left unspecified, otherwise the only valid value is 0.If
batch_size > 1, thenbatch_wait_timeis 0.01 by default.- cpu_affinity
Which CPUs this worker process is going to be “pinned” to. If None, no pinning.
If this worker is used in a ThreadServlet, this parameter is not specified in the call, and its value is None and is unused.
- preprocess: Callable[[Any], Any]#
If a subclass has a method
preprocessor an attributepreprocessthat is a free-standing function, this method or function must take one data element (not a batch) as the sole, positional argument. This processes/transforms the data, and the output is used incall(). If this function raises an exception, this element is not sent tocall(); instead, the exception object is short-circuited to the output queue.When
self.batch_size > 1, ifcall()needs to take care of an element of the batch that might fail a pre-condition, it is tedious to properly assemble the “good” and “bad” elements to further processing or to output in right order. Thispreprocessmechanism helps to deal with that situation.When a subclass is designed to do non-batching work, this attribute is not necessary, because the same concern can be handled in
call()directly.When
self.preprocessis defined, it is used in_start_single()and_build_input_batches().If a
Servercontains a single servlet, which uses thisWorker, then the functionalities of thisself.preprocesscan be largely provided by the parameterpreprocessortoServer.stream. In those case, there is no need for thisself.preprocess.
- call(x)[source]#
Private methods of this class wait on the input queue to gather “work orders”, send them to
call()for processing, collect the outputs ofcall(), and put them in the output queue.If
self.batch_size == 0, thenxis a single element, and this method returns result forx.x is not an
ExceptionorRemoteExceptionobject; such a value would have been routed to the outgoing pipe and not passed to this method. The same is true for elements of x when self.batch_size > 0.If
self.batch_size > 0(including 1), thenxis a list of input data elements, and this method returns a list (or Sequence) of results corresponding to the elements inx. However, this output, when received by private methods of this class, will be split and individually put in the output queue, so that the elements in the output queue (q_out) correspond to the elements in the input queue (q_in), although vectorized computation, or batching, has happened internally.When batching is enabled (i.e. when
self.batch_size > 0), the number of elements inxvaries between calls depending on the supply in the input queue. The listxdoes not have a fixed length.Be sure to distinguish the case with batching (
batch_size > 0) and the case w/o batching (batch_size = 0) where a single input is a list. In the latter case, the output of this method is the result corresponding to the single inputx. The result could be anything—it may or may not be a list.If a subclass fixes
batch_sizein its__init__to be 0 or nonzero, make sure this method is implemented accordingly.If
__init__does not fix the value ofbatch_size, then a particular instance may have been created with or without batching. In this case, this method needs to checkself.batch_sizeand act accordingly,If this method raises exceptions, unless the user has specific things to do, do not handle them; just let them happen. They will be handled in private methods of this class that call this method.
Usually this is the only method a subclass needs to customize. In rare cases, a subclass may want to customize
stream()instead of or in addition tocall().
- stream(xx: Iterable) Iterator[source]#
xx is an iterable of input x to
call(). (If self.batch_size > 0, then `xx is an iterable of batches.) This function yields the results ofcall()for the elements of xx, in the right order. If any invocation ofcall()raises an exception, the exception object is yielded.The elements of xx (or elements of the elements of
xwhen self.batch_size > 0) are not instances of Exception or RemoteException. Such values would have been routed to the outgoing pipe and not passed to this method.The background loop in
start()calls this method and does not callcall()directly.This method is provided mainly for the special use cases where a subclass wants to set self.num_stream_threads to a positive number, thereby use threading concurrency in this method.
If a subclass re-implements this method without calling
call(), thencall()does not need to be implemented, because this method is the only place of this class that callscall().
- mpservice.mpserver.make_worker(func: Callable[[Any], Any]) type[Worker][source]#
This function defines and returns a simple
Workersubclass for quick, “on-the-fly” use. This can be useful when we want to introduce simple servlets for pre-processing and post-processing.- Parameters:
- func
This function is what happens in the method
call().
- mpservice.mpserver.PassThrough = <class 'mpservice.mpserver._worker.PassThrough'>[source]#
Example use of this class:
def combine(x): ''' Combine the ensemble elements depending on the results as well as the original input. ''' x, *y = x assert len(y) == 3 if x < 100: return sum(y) / len(y) else: return max(y) s = EnsembleServlet( ThreadServlet(PassThrough), ProcessServlet(W1), ProcessServlet(W2) ProcessServlet(W3), ) ss = SequentialServlet(s, ThreadServlet(make_worker(combine)))
Servlets#
- class mpservice.mpserver.Servlet[source]#
Bases:
ABCA
Servletmanages the execution of one or moreWorker. We make a distinction between “simple” servlets, includingProcessServletandThreadServlet, and “compound” servlets, includingSequentialServlet,EnsembleServlet, andSwitchServlet.A simple servlet arranges to execute one
Workerin requested number of processes (or threads). Optionally, it can specify exactly which CPU(s) each worker process should use. Each input item is passed to and processed by exactly one of the processes (or threads).A compound servlet arranges to execute multiple
Servlets as a sequence or an ensemble. In addition, there isSwitchServletthat acts as a “switch” in front of a set of Servlets. There’s a flavor of recursion in this definition in that a member servlet can very well be a compound servlet.Great power comes from this recursive definition. In principle, we can freely compose and nest the
Servlettypes. For example, suppose W1, W2,…, areWorkersubclasses, then we may design such a workflow,s = SequentialServlet( ProcessServlet(W1), EnsembleServlet( ThreadServlet(W2), SequentialServlet(ProcessServlet(W3), ThreadServlet(W4)), ), EnsembleServlet( Sequetial(ProcessServlet(W5), ProcessServlet(W6)), Sequetial(ProcessServlet(W7), ThreadServlet(W8), ProcessServlet(W9)), ), )
- abstract stop() None[source]#
When this method is called, there shouldn’t be “pending” work in the servlet. If for whatever reason, the servlet is in a busy, messy state, it is not required to “wait” for things to finish. The priority is to ensure the processes and threads are stopped.
The primary mechanism to stop a servlet is to put the special constant
Nonein the input queue. The user should have done that; but just to be sure, this method may do that again.
- abstract property input_queue_type: Literal['thread', 'process']#
Indicate whether the input queue can be a “thread queue” (i.e.
queue.Queue) or needs to be a “process queue” (i.e.multiprocessing.queues.Queue). If “thread”, caller can provide either a thread queue or a process queue. If “process”, caller must provide a process queue.
- abstract property output_queue_type: Literal['thread', 'process']#
Indicate whether the output queue can be a “thread queue” (i.e.
queue.Queue) or needs to be a “process queue” (i.e.multiprocessing.queues.Queue). If “thread”, caller can provide either a thread queue or a process queue. If “process”, caller must provide a process queue.
- abstract property children: list#
- class mpservice.mpserver.ProcessServlet[source]#
Bases:
ServletUse this class if the operation is CPU bound.
- __init__(worker_cls: type[Worker], *, cpus: None | int | Sequence[None | int | Sequence[int]] = None, worker_name: str | None = None, **kwargs)[source]#
- Parameters:
- worker_cls
A subclass of
Worker.- cpus
Specifies how many processes to create and how they are pinned to specific CPUs.
The default is
None, indicating a single unpinned process.If an int, indicating number of (unpinned) processes.
Otherwise, a list specifiying CPU pinning. Each element of the list specifies the CPU pinning of one process. The number of processes created is the number of elements in
cpus. The CPU spec is very flexible. For example,cpus=[[0, 1, 2], [0], [2, 3], [4, 5, 6], 4, None]
This instructs the servlet to create 6 processes, each running an instance of
worker_cls. The CPU affinity of each process is as follows:CPUs 0, 1, 2
CPU 0
CPUs 2, 3
CPUs 4, 5, 6
CPU 4
Any CPU, no pinning
- worker_name
Prefix to the name of the worker processes. If not provided, a default is constructed based on the class name.
- **kwargs
Passed to the
__init__method ofworker_cls.
Notes
When the servlet has multiple processes, the output stream does not need to follow the order of the elements in the input stream.
- start(q_in: _SimpleProcessQueue, q_out: _SimpleProcessQueue)[source]#
Create the requested number of processes, in each starting an instance of
self._worker_cls.- Parameters:
- q_in
A queue with input elements. Each element will be passed to and processed by exactly one worker process.
- q_out
A queue for results.
- property input_queue_type#
- property output_queue_type#
- property workers#
- property children#
- class mpservice.mpserver.ThreadServlet[source]#
Bases:
ServletUse this class if the operation is I/O bound (e.g. calling an external service to get some info), and computation is very light compared to the I/O part. Another use-case of this class is to perform some very simple and quick pre-processing or post-processing.
- __init__(worker_cls: type[Worker], *, num_threads: int | None = None, worker_name: str | None = None, **kwargs)[source]#
- Parameters:
- worker_cls
A subclass of
Worker- num_threads
The number of threads to create. Each thread will host and run an instance of
worker_cls. DefaultNonemeans 1.- worker_name
Prefix to the name of the worker threads. If not provided, a default is constructed based on the class name.
- **kwargs
Passed on the
__init__method ofworker_cls.
Notes
When the servlet has multiple threads, the output stream does not need to follow the order of the elements in the input stream.
- start(q_in: _SimpleProcessQueue | _SimpleThreadQueue, q_out: _SimpleProcessQueue | _SimpleThreadQueue)[source]#
Create the requested number of threads, in each starting an instance of
self._worker_cls.- Parameters:
- q_in
A queue with input elements. Each element will be passed to and processed by exactly one worker thread.
- q_out
A queue for results.
q_inandq_outare either_SimpleProcessQueues (for processes) or_SimpleThreadQueues (for threads). Because this servlet may be connected to eitherProcessServlets orThreadServlets, either type of queues may be appropriate. In contrast, forProcessServlet, the input and output queues are both_SimpleProcessQueues.
- property input_queue_type#
- property output_queue_type#
- property workers#
- property children#
- class mpservice.mpserver.SequentialServlet[source]#
Bases:
ServletA
SequentialServletrepresents a sequence of operations performed in order, one operations’s output becoming the next operation’s input.Each operation is performed by a “servlet”, that is, an instance of any subclass of
Servlet, includingSequentialServlet. However, a SequentialServlet as a direct member of another SequentialServlet may not be beneficial—you may as well flatten it out.If any member servlet has multiple workers (threads or processes), the output stream does not need to follow the order of the elements in the input stream.
- start(q_in, q_out)[source]#
Start the member servlets.
A main concern is to connect the servlets by “pipes”, or queues, for input and output, in addition to the very first
q_in, which carries input items from the “outside world” and the very lastq_out, which sends output items to the “outside world”.Each item in
q_ingoes to the first member servlet; the result goes to the second servlet; and so on. The result out of the last servlet goes toq_out.The types of
q_inandq_outare decided by the caller. The types of intermediate queues are decided within this function. As a rule, use_SimpleThreadQueuebetween two threads; use_SimpleProcessQueuebetween two processes or between a process and a thread.
- property workers#
- property children#
- property input_queue_type#
- property output_queue_type#
- class mpservice.mpserver.EnsembleServlet[source]#
Bases:
ServletA
EnsembleServletrepresents an ensemble of operations performed in parallel on each input item. The list of results, corresponding to the order of the operators, is returned as the result.Each operation is performed by a “servlet”, that is, an instance of any subclass of
Servlet.If
fail_fastisTrue(the default), once one ensemble member raises an Exception, anEnsembleErrorwill be returned. Results of the other ensemble members that arrive afterwards will be ignored. This is not necessarily “fast”; the main point is that the item in question results in an Exception rather than a list that contains Exception(s).If
fail_fastisFalse, then Exception results, if any, are included in the result list. If all entries in the list are Exceptions, then the result list is replaced by anEnsembleError. Here is the logic: if the result list contains some valid results and some Exceptions, user may consider it “partially” successful and may choose to make use of the valid results in some way; if every ensemble member has failed, then the ensemble has failed, hence the result is a singleEnsembleError.The output stream does not need to follow the order of the elements in the input stream.
- start(q_in, q_out)[source]#
Start the member servlets.
A main concern is to wire up the parallel execution of all the servlets on each input item.
q_inandq_outcontain inputs from and outputs to the “outside world”. Their types, either_SimpleProcessQueueor_SimpleThreadQueue, are decided by the caller.
- property workers#
- property children#
- property input_queue_type#
- property output_queue_type#
- class mpservice.mpserver.SwitchServlet[source]#
Bases:
ServletSwitchServlet contains multiple member servlets (which are provided to
__init__()). Each input element is passed to and processed by exactly one of the members based on the output of the methodswitch().This is somewhat analogous to the “switch” construct in some programming languages.
- abstract switch(x) int[source]#
- Parameters:
- x
An element received via the input queue, that is, the parameter
q_intostart(). If the current servlet is preceded by another servlet, thenxis an output of the other servlet.In principle, this method should not modify
x. It is expected to be a quick check onxto determine which member servlet should process it.xis never an instance of Exception or RemoteException. That case is already taken care of before this method is called. This method should not be used to handle such exception cases.
- Returns:
- int
The index of the member servlet that will receive and process
x. This number is between 0 and the number of member servlets minus 1, inclusive.
- property input_queue_type#
- property output_queue_type#
- property workers#
- property children#
Server#
- class mpservice.mpserver.Server[source]#
Bases:
objectThe “interfacing” and “scheduling” code of
Serverruns in the “main process”. Two usage patterns are supported, namely making individual calls to the service to get individual results, or flowing a (potentially unlimited) stream of data through the service to get a stream of results.A typical setup looks like this:
servlet = SequentialServlet(...) # or other types of servlets server = Server(servlet) with server: z = server.call('abc') for x, y in server.stream(data, return_x=True): print(x, y)
Code in the “workers” (of servlet) should raise exceptions as it normally does, without handling them, if it considers the situation to be non-recoverable, e.g. input is of wrong type. The exceptions will be funneled through the pipelines and raised to the end-user with useful traceback info.
The user’s main work is implementing the operations in the “workers”. Another task (of some trial and error) by the user is experimenting with CPU allocations among workers to achieve best performance.
Serverhas an async counterpart namedAsyncServer.- final classmethod get_mp_context()[source]#
If subclasses need to use additional Queues, Locks, Conditions, etc, they should create them out of this context. This returns a spawn context.
Subclasses should not customize this method.
- __init__(servlet: Servlet, *, capacity: int = 256)[source]#
- Parameters:
- servlet
The servlet to run by this server.
The
servlethas not “started”. Itsstart()will be called in__enter__().- capacity
Max number of requests concurrently in progress within this server, all pipes/servlets/stages combined.
For each request received, a UUID is assigned to it. An entry is added to an internal book-keeping dict. Then this ID along with the input data enter the processing pipeline. Coming out of the pipeline is the ID along with the result. The book-keeping record is found by the ID, used, and removed from the dict. Removal of finished requests from the book-keeping dict makes room for new requests. The
capacityvalue is simply the size limit on this internal book-keeping dict.
- property capacity: int#
The value of the parameter capacity to
__init__().
- property backlog: int#
The number of items currently being processed in the server. They may be in various stages.
- __enter__()[source]#
The main operations conducted in this method include:
Start the servlet (hence creating and starting
Workerinstances).Set up queues between the main thread (this “server” object) and the workers for passing input elements (and intermediate results) and results.
Set up background threads responsible for taking incoming calls and gathering/returning results. Roughly speaking, when
call()is invoked, its input is handled by a thread, which places the input in a pipeline with appropriate book-keeping; it then waits for the result, which becomes available once another thread gathers the result from (another end of) the pipeline.
- __exit__(*args)[source]#
The main operations conducted in this method include:
Place a special sentinel in the pipeline to indicate the end of operations; all
Workers in the servlet will eventually see the sentinel and exit.Wait for the servlet and all helper threads to exit.
- call(x, /, *, timeout: int | float = 60, backpressure: bool = True)[source]#
Serve one request with input
x, return the result.This method is thread-safe, meaning it can be called from multiple threads concurrently.
If the operation failed due to
ServerBacklogFull,TimeoutError, or any exception in any parts of the server, the exception is propagated.- Parameters:
- x
Input data element.
- timeout
In seconds. If result is not ready after this time,
TimeoutErroris raised.There are two situations where timeout happens. At first,
xis placed in an input queue for processing. This step is called “enqueue”. If the queue is full for the moment, the code will wait (ifbackpressureisFalse). If a spot does not become available during thetimeoutperiod, theServerBacklogFullerror message will be “… seconds enqueue”.Effectively, the input queue is considered full if there are
capacitycount of ongoing (i.e. received but not yet finished) requests in the server in all stages combined, wherecapacityis a parameter to__init__().Once
xis placed in the input queue, code will wait for the result to come out at the end of an output queue. If result is not yet ready when thetimeoutperiod is over, theTimeoutErrormessage will be “.. seconds total”. This waitout period includes the time that has been spent in the “enqueue” step, that is, the timer starts upon receiving the request, i.e. at the beginning of the functioncall.- backpressure
If
True, and the input queue is full (that is,self.backlog == self.capacity), do not wait; raiseServerBacklogFullright away. IfFalse, wait on the input queue for as long astimeoutseconds.The exception
ServerBacklogFullmay indicate an erronous situation—somehow the server is (almost) stuck and can not process requests as quickly as expected—or a valid situation—the server is getting requests faster than its expected load.
- debug_info() dict[source]#
Return a dict with various pieces of info, about the status and health of the server, that may be helpful for debugging. Example content includes self.backlog, active child processes, status (alive/dead) of helper threads.
The content of the returned dict is str, int, or other types that are friendly to json.
- stream(data_stream: Iterable, /, *, return_x: bool = False, return_exceptions: bool = False, timeout: int | float = 3600, preprocessor: Callable | None = None) Iterator[source]#
Use this method for high-throughput processing of a long stream of data elements. In theory, this method achieves the throughput upper-bound of the server, as it saturates the pipeline.
The order of elements in the stream is preserved, i.e., elements in the output stream correspond to elements in the input stream in the same order.
This method is thread-safe, that is, multiple threads can call this method concurrently with their respective input streams. The server will serve the requests interleaved as fast as it can. In that case, you may want to use
return_exceptions=Truein each of the calls so that one’s exception does not propagate and halt the program.It is also fine to have calls to
call()andstream()concurrently (from multiple threads, for example).- Parameters:
- data_stream
An (possibly unlimited) iterable of input data elements.
- return_x
If
True, each output element is a length-two tuple containing the input data and the result. IfFalse, each output element is just the result.- return_exceptions
If
True, any Exception object will be produced in the output stream in place of the would-be regular result. IfFalse, exceptions will be propagated right away, crashing the program.- timeout
Interpreted the same as in
call().In a streaming task, “timeout” is usually not a concern compared to overall throughput. You can usually leave it at the default value or make it even larger as needed.
- preprocessor
See
fifo_stream().
- class mpservice.mpserver.AsyncServer[source]#
Bases:
objectAn
AsyncServerobject must be started in an async context manager. The primary methodscall()andstream()are async.Most concepts and usage are analogous to
Server.- property capacity: int#
- property backlog: int#
- async call(x, /, *, timeout: int | float = 60, backpressure: bool = True)[source]#
When this is called, this server is usually backing a (http or other) service using some async framework. Concurrent async calls to this method may happen.
See also
- async stream(data_stream: AsyncIterable, /, *, return_x: bool = False, return_exceptions: bool = False, timeout: int | float = 3600, preprocessor: Callable | None = None) AsyncIterator[source]#
Calls to
stream()andcall()can happen at the same time (i.e. interleaved); multiple calls tostream()can also happen at the same time by different “users” (in the same thread).See also
- exception mpservice.mpserver.TimeoutError[source]#
Bases:
TimeoutError
Answer to the Example challenge: when we run the script, this is the printout:
$ python test.py
Hello, wdwdwdwdwd!
Hello, 20!