Additional utilities#
Threading#
- class mpservice.threading.Thread[source]#
Bases:
ThreadA subclass of the standard
threading.Thread, this class makes the result or exception produced in a thread accessible from the thread object itself. This makes theThreadobject’s behavior similar to theFutureobject returned byconcurrent.futures.ThreadPoolExecutor.submit.- join(timeout=None)[source]#
Same behavior as the standard lib, except that if the thread terminates with an exception, the exception is raised.
- done() bool[source]#
Return
Trueif the thread has terminated. ReturnFalseif the thread is running or not yet started.
- throw(exc: BaseException | Type[BaseException]) None[source]#
Raise exception
excinside the thread.Do not use this unless you know what you’re doing. A main intended user is
terminate().
Concurrent Futures#
- class mpservice.concurrent.futures.ProcessPoolExecutor[source]#
Bases:
ProcessPoolExecutorThis class is a drop-in replacement of the standard concurrent.futures.ProcessPoolExecutor. By default, it uses a “spawn” context and the process class
SpawnProcess.In addition, the parameter
loud_exceptioncontrols whether to print out exception info if the submitted worker task fails with an exception. The default isTrue, whereasFalsehas the behavior of the standard library, which does not print exception info in the worker process. Although exception info can be obtained in the caller process via the Future object returned from the methodsubmit(), the printing in the worker process is handy for debugging in cases where the user fails to check the Future object in a timely manner.
- class mpservice.concurrent.futures.ThreadPoolExecutor[source]#
Bases:
ThreadPoolExecutorThis class is a drop-in replacement of the standard concurrent.futures.ThreadPoolExecutor. The parameter
loud_exceptioncontrols whether to print out exception info if the submitted worker task fails with an exception. The default isTrue, whereasFalsehas the behavior of the standard library, which does not print exception info in the worker thread.
Get a globally shared “thread pool”, that is, concurrent.futures.ThreadPoolExecutor.
If an executor with the requested
namedoes not exist, it will be created with the specifiedmax_workersargument (or using default if not specified).If the named executor exists, it will be returned. However, if
max_workersis specified but mismatches the “max workers” of the existing executor,ValueErroris raised.User should assign the returned executor to a variable and keep the variable in scope as long as the executor is needed. Once all user references to a named executor have been garbage collected, the executor is gone. When it is requested again, it will be created again.
User should not call
shutdownon the returned executor, because it is shared with other users.This function is thread-safe, meaning it can be called safely in multiple threads with different or the same
name.Example use case: an instance of a class needs to start and use a ThreadPoolExecutor; user may have many such instances live at the same time although they are unlikely to use the ThreadPoolExecutor at the same time; to avoid having too many threads open, the class may choose to use a “shared” thread pool between the instances.
Get a globally shared “process pool”, that is, concurrent.futures.ProcessPoolExecutor.
Analogous to
get_shared_thread_pool().
Queue#
- class mpservice.queue.ResponsiveQueue[source]#
Bases:
object
- class mpservice.queue.IterableQueue[source]#
Bases:
Iterator[Elem]- __init__(q: Queue | SimpleQueue | Queue | SimpleQueue, *, num_suppliers: int = 1, to_stop: Event | Event | None = None)[source]#
- num_suppliers: number of parties that will supply data elements to the queue by calling
put(). The parties are typically in different threads or processes. Each supplier should call
put_end()exactly once to indicate it is done adding data.- to_stop: this is used by other parts of the application to tell this queue to exit (because
some error has happened elsewhere), e.g. stop waiting on get or put. If the queue is to be passed between processes, to_stop should be a mpservice.multiprocessing.Event; otherwise, to_stop can be either threading.Event or mpservice.multiprocessing.Event (the latter may be required because the object to_stop needs to be passed between processes in other parts of the user application).
None is used internally as a special indicator. It must not be a valid value in the user application.
Typical use case:
In each of the (one or more) supplier threads or processes, do
q.put(x) q.put(y) … q.put_end()
In each of the (one or more) consumer threads or processes, do
- for z in q:
use(z)
The consumers collectively consume the data elements that have been put in the queue.
- num_suppliers: number of parties that will supply data elements to the queue by calling
- property maxsize: int#
- put(x: Elem, *, timeout=None) None[source]#
Use this method to put data elements in the queue.
Once finished putting data in the queue (as far as one supplier, such as one thread or process, is concerned), call put_end() exactly once (per supplier).
User should never call put(None). That is reserved to be called by put_end() to indicate the end of one supplier’s data input.
- put_end(*, wait_for_renew: bool = False) None[source]#
Each “supplier” must call this method exactly once, after it is done putting data in the queue. Do not use put(None) for this purpose.
Suppose in a certain use case a queue is populated by a supplier, which has called put_end; a consumer is designed to call renew after it finishes iterating the queue, allowing the next round of data population/consumption. Before the consumer calls renew, this object does not forbid the supplier from calling put to add new data elements to the queue (unless the user imposes such restriction themselves), although these new data elements are not accessible via __next__ or __iter__ until the consumer has called renew. Suppose the supplier gets to call put_end before the consumer calls renew, the object is in an unexpected state. If wait_for_renew is True, this situation is allowed, and put_end will wait to go through once renew is called. If wait_for_renew is False (the default), exception is raised in this situation.
- renew()[source]#
This is for special use cases where the queue needs to be “reused” for more than one round of iterations. In those use cases, typically the consumer (or consuming side if there are more than one consumers) calls renew exactly once upon finishing iteration over the content of the queue. The suppliers can put more data into the queue and call put_end as usual once done; the consumer then iterates over the queue as if there were no previous rounds.
This method can only be called after one round of consumption (by iteration) is complete. It can not be called at the beginning when no data has been placed in the queue, because the implementation does not provide a way to tell the object is in such “brand new” state.
The application needs to ensure renew is called only once after one round of iteration.