Utilities for multiprocessing#

The module mpservice.multiprocessing provides some customizations and enhancements to the standard module multiprocessing. Most of the customizations are drop-in replacements.

First, the standard package multiprocessing has a “context”, which has to do with how a process is created and started. Multiprocessing objects like Queue, Event, etc., must be created from a context that matches the process in order to be used with the process. The default context on Linux is a “fork” one. However, it’s recommended to use a “spawn” context. The mpservice.multiprocessing.SpawnContext customizes the standard counterpart.

Second, in well structured code, a spawned process will not get the logging configurations that have been set in the main process. On the other hand, we should definitely not separately configure logging in child processes. The class mpservice.multiprocessing.SpawnProcess addresses this issue by sending log messages of child processes to the main process for handling, all transparently.

Third, one convenience of concurrent.futures compared to multiprocessing is that the former makes it easy to get the results or exceptions of the child process via the object returned from job submission. With multiprocessing, in contrast, we have to pass the results or explicitly captured exceptions to the main process via a queue. The custom SpawnProcess has this covered as well–it can be used in the concurrent.futures way.

Fourth, if an Exception object is pickled, its traceback info is lost. A consequence of this is that if exception happens in a child process and we don’t want the program to crash right there, instead we send it to the main process to be investigated or raised when/where we are ready to, we won’t have the traceback info. For example, the printout of raise .. in another process will not be very informative. The module mpservice.multiprocessing.remote_exception helps on this.

Besides these fixes to “pain points”, the module mpservice.multiprocessing.server_process provides some new capabilities to the “manager” facility in the standard multiprocessing, especially about “shared memory”.

The “spawn” process and context#

class mpservice.multiprocessing.SpawnContext[source]#

Bases: SpawnContext

The standard package multiprocessing has a “context”, which has to do with how a process is created and started. For example, a ForkContext (or SpawnContext) will create processes by ForkProcess (or SpawnProcess). To use a queue to communicate with the process, the queue needs to be created via ForkContext.Queue (or SpawnContext.Queue), else it will not work.

If you do

import multiprocessing
q = multiprocessing.Queue()

this is equivalent to

q = multiprocessing.get_context().Queue()

multiprocessing.get_context takes the sole parameter method, which on Linux defaults to 'fork'.

However, it is advised to not use this default; rather, always use the “spawn” context. There are some references on this topic; for example, see this article and this StackOverflow thread.

So, multiprocessing code is better written this way:

import multiprocessing
ctx = multiprocessing.get_context('spawn')
q = ctx.Queue(...)
e = ctx.Event(...)
p = ctx.Process(..., args=(q, e))
...

where ctx is an instance of multiprocessing.context.SpawnContext.

mpservice.multiprocessing.SpawnContext inherits from the standard SpawnContext and provides some enhancements. (The main enhancement is to use the custom mpservice.multiprocessing.SpawnProcess for process creation.) The constant mpservice.multiprocessing.MP_SPAWN_CTX points to an instance of this class. With these facilities, the code above can be replaced by this:

from mpservice.multiprocessing import MP_SPAWN_CTX as ctx
q = ctx.Queue(...)
e = ctx.Event(...)
p = ctx.Process(..., args=(q, e))
...

However, there is an annoyance here: ctx.Queue, ctx.Event, and some others are not classes, but rather factory methods. As a result, they can not be used to annotate the type of the objects created by them. (Similarly, if you do from multiprocessing import Queue, Event, you are getting factory methods, not classes. The actual classes, multiprocessing.queues.Queue and multiprocessing.synchronize.Event, have a required parameter ctx, hence are somewhat inconvenient to use.)

The module mpservice.multiprocessing breaks from the standard multiprocessing API design to alleviate this problem. It provides custom classes Queue, Event, and some others, that have an optional rather than required parameter ctx, which defaults to, you guessed it, mpservice.multiprocessing.MP_SPAWN_CTX. With this, the code above is better yet written this way:

from mpservice.multiprocessing import Queue, Event, Process
q: Queue = Queue(...)
e: Event = Event(...)
p: Process = Process(..., args=(q, e))

Note

Recommendations on the use of MP_SPAWN_CTX: use the classes Process, Manager, Lock, RLock, Condition, Semaphore, BoundedSemaphore, Event, Barrier, Queue, JoinableQueue, SimpleQueue, Pool diretly to create objects and type-annote them; this is preferred over MP_SPAWN_CTX.Process, MP_SPAWN_CTX.Manager, etc, although they would work, too. A few other methods of SpawnContext are not exposed in mpservice.multiprocessing; you may use them via the object MP_SPAWN_CTX.

Process#

alias of SpawnProcess

get_context(method=None)[source]#
class mpservice.multiprocessing.SpawnProcess[source]#

Bases: SpawnProcess

A subclass of the standard multiprocessing.context.SpawnProcess, this customization adds two things:

  1. Make result and exception available as attributes of the process object, hence letting you use a SpawnProcess object similarly to how you use the Future object returned by concurrent.futures.ProcessPoolExecutor.submit.

  2. Make logs in the worker process handled in the main process.

    Logging messages produced in worker processes are tricky. First, some settings should be concerned in the main process only, including log formatting, log-level control, log handler (destination), etc. Specifically, these should be settled in the “launching script”, and definitely should not be concerned in worker processes. Second, the terminal printout of loggings in multiple processes tends to be intermingled and mis-ordered.

    This class uses a queue to transmit all logging messages that are produced in the worker process to the main process/thread, to be handled there.

This class is aliased by mpservice.multiprocessing.Process. It is preferred to import by the aliased name.

Examples

Let’s use an example to show the logging behavior. First, use a spawn-context from the standard multiprocessing:

 1# log.py
 2import logging
 3import multiprocessing as mp
 4from mpservice.multiprocessing import SpawnProcess
 5
 6def worker():
 7    logging.getLogger('worker.error').error('worker error')
 8    logging.getLogger('worker.warn').warning('worker warning')
 9    logging.getLogger('worker.info').info('worker info')
10    logging.getLogger('worker.debug').debug('worker debug')
11
12def main():
13    logging.getLogger('main.error').error('main error')
14    logging.getLogger('main.info').info('main info')
15    p = mp.get_context('spawn').Process(target=worker)
16    p.start()
17    p.join()
18    logging.getLogger('main.warn').warning('main warning')
19    logging.getLogger('main.debug').debug('main debug')
20
21if __name__ == '__main__':
22    logging.basicConfig(
23        format='[%(asctime)s.%(msecs)02d; %(levelname)s; %(name)s; %(funcName)s, %(lineno)d] [%(processName)s]  %(message)s',
24        level=logging.DEBUG,
25    )
26    main()

Run it:

$ python log.py
[2022-12-20 17:29:54,386.386; ERROR; main.error; main, 15] [MainProcess]  main error
[2022-12-20 17:29:54,386.386; INFO; main.info; main, 16] [MainProcess]  main info
worker error
worker warning
[2022-12-20 17:29:54,422.422; WARNING; main.warn; main, 20] [MainProcess]  main warning
[2022-12-20 17:29:54,423.423; DEBUG; main.debug; main, 21] [MainProcess]  main debug

Clearly, the child process exhibits the default behavior—print the warning-and-above-level log messages to the console—unaware of the logging configuration set in the main process. This is a show stopper.

On line 15, replace mp.get_context('spawn').Process by SpawnProcess. Run it again:

$ python log.py
[2022-12-20 17:39:31,284.284; ERROR; main.error; main, 15] [MainProcess]  main error
[2022-12-20 17:39:31,284.284; INFO; main.info; main, 16] [MainProcess]  main info
[2022-12-20 17:39:31,321.321; ERROR; worker.error; worker, 8] [SpawnProcess-1]  worker error
[2022-12-20 17:39:31,321.321; WARNING; worker.warn; worker, 9] [SpawnProcess-1]  worker warning
[2022-12-20 17:39:31,321.321; INFO; worker.info; worker, 10] [SpawnProcess-1]  worker info
[2022-12-20 17:39:31,322.322; DEBUG; worker.debug; worker, 11] [SpawnProcess-1]  worker debug
[2022-12-20 17:39:31,327.327; WARNING; main.warn; main, 20] [MainProcess]  main warning
[2022-12-20 17:39:31,327.327; DEBUG; main.debug; main, 21] [MainProcess]  main debug

This time, logs in the child process respect the level and format configurations set in the main process (because they are sent to and handled in the main process).

__init__(*args, kwargs=None, **moreargs)[source]#
Parameters:
*args

Positional arguments passed on to the standard Process.

kwargs

Passed on to the standard Process.

**moreargs

Additional keyword arguments passed on to the standard Process.

start()[source]#
static handle_exception(exc)[source]#
run()[source]#

Overrides the standard Process.run.

start arranges for this to be run in a child process.

join(timeout=None)[source]#

Same behavior as the standard lib, except that if the process terminates with an exception, the exception is raised.

done() bool[source]#

Return True if the process has terminated normally or with exception. Return False if the process is running or not yet started.

result(timeout: float | int | None = None)[source]#

Behavior is similar to concurrent.futures.Future.result.

exception(timeout: float | int | None = None)[source]#

Behavior is similar to concurrent.futures.Future.exception.

Standard classes with customizations#

The following classes are subclasses of their counterparts in the standard multiprocessing. The typical customization is that by default they use SpawnContext as the “context”. In addition, the queue types (Queue, SimpleQueue, JoinableQueue) are made Generic to enable annotating the type of the elements contained in them.

class mpservice.multiprocessing.Barrier[source]#

Bases: Barrier

__init__(*args, ctx=None, **kwargs)[source]#
class mpservice.multiprocessing.Condition[source]#

Bases: Condition

__init__(lock=None, *, ctx=None)[source]#
class mpservice.multiprocessing.Event[source]#

Bases: Event

__init__(*, ctx=None)[source]#
class mpservice.multiprocessing.Lock[source]#

Bases: Lock

__init__(*, ctx=None)[source]#
class mpservice.multiprocessing.RLock[source]#

Bases: RLock

__init__(*, ctx=None)[source]#
class mpservice.multiprocessing.Semaphore[source]#

Bases: Semaphore

__init__(value=1, *, ctx=None)[source]#
class mpservice.multiprocessing.BoundedSemaphore[source]#

Bases: BoundedSemaphore

__init__(value=1, *, ctx=None)[source]#
class mpservice.multiprocessing.Queue[source]#

Bases: Queue, Generic[Elem]

__init__(maxsize=0, *, ctx=None)[source]#
property maxsize#
class mpservice.multiprocessing.SimpleQueue[source]#

Bases: SimpleQueue, Generic[Elem]

__init__(*, ctx=None)[source]#
class mpservice.multiprocessing.JoinableQueue[source]#

Bases: JoinableQueue, Generic[Elem]

__init__(maxsize=0, *, ctx=None)[source]#
class mpservice.multiprocessing.Pool[source]#

Bases: Pool

__init__(*args, context=None, **kwargs)[source]#
mpservice.multiprocessing.Manager#

alias of SyncManager

Waiting for processes to finish#

mpservice.multiprocessing.wait(workers: Sequence[Thread | SpawnProcess], /, timeout=None, return_when='ALL_COMPLETED') tuple[set[Thread | SpawnProcess], set[Thread | SpawnProcess]][source]#

workers is a sequence of Thread or SpawnProcess that have been started. It can be a mix of the two types.

See concurrent.futures.wait.

mpservice.multiprocessing.as_completed(workers: Sequence[Thread | SpawnProcess], /, timeout=None) Iterator[Thread | SpawnProcess][source]#

See concurrent.futures.as_completed.

Remote exception#

The class RemoteException is a pickle helper for Exception objects to preserve some traceback info. This is needed because directly calling pickle.dumps on an Exception object will lose its __traceback__ and __cause__ attributes.

This is designed to be used to send an exception object to another process. It can also be used to pickle-persist an Exception object for some time.

This class preserves some traceback info simply by keeping it as a formatted string during pickling. Once unpickled, the object obtained, say obj, is not an instance of RemoteException, but rather of the original Exception type. obj does not have the __traceback__ attribute, which is impossible to reconstruct, but rather has the __cause__ attribute, which is a custom Exception object (RemoteTraceback) that contains the string-form traceback info, with proper line breaks. Most (if not all) methods and attributes of obj behave the same as the original Exception object, except that __traceback__ is gone but __cause__ is added.

Note

type(pickle.loads(pickle.dumps(RemoteException(e)))) is not RemoteException, but rather type(e).

This design is a compromise. Since the object out of unpickling is not an instance of RemoteException, we “lose control of it”, in the sense that we can’t add methods in RemoteException to help on the use of that object. A clear benefit, though, is that we can detect the type of the exception by isinstanceof or by the type list in try ... except <type_list>, without even knowing about RemoteException.

Note

RemoteException does not subclass BaseException, hence you can’t raise an instance of this class.

The session below shows the basic behaviors of a RemoteException.

>>> from mpservice.multiprocessing.remote_exception import RemoteException, is_remote_exception, get_remote_traceback
>>> import pickle
>>>
>>>
>>> def foo():
...     raise ValueError(38)
>>>
>>>
>>> def gee():
...     foo()
>>>
>>>
>>> gee()
Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    File "<stdin>", line 2, in gee
    File "<stdin>", line 2, in foo
ValueError: 38
.
38
>>>
>>> err = None
>>> try:
...     gee()
... except Exception as e:
...     err = e
>>>
>>> err
ValueError(38)
>>> err.__traceback__  
<traceback object at 0x7...>
>>> err.__cause__ is None
True
>>>
>>> e_remote = RemoteException(err)
>>> e_remote
RemoteException(ValueError(38))
>>> e_pickled = pickle.dumps(e_remote)
>>> e_unpickled = pickle.loads(e_pickled)
>>>
>>> e_unpickled
ValueError(38)
>>> type(e_unpickled)
<class 'ValueError'>
>>> e_unpickled.__traceback__ is None
True
>>> e_unpickled.__cause__  
RemoteTraceback('Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "<stdin>", line 2, in gee
  File "<stdin>", line 2, in foo
ValueError: 38
')
>>>
>>> is_remote_exception(e_unpickled)
True
>>> get_remote_traceback(e_unpickled)  
'Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "<stdin>", line 2, in gee
  File "<stdin>", line 2, in foo
  ValueError: 38
'
>>> print(get_remote_traceback(e_unpickled))  
Traceback (most recent call last):
    File "<stdin>", line 2, in <module>
    File "<stdin>", line 2, in gee
    File "<stdin>", line 2, in foo
ValueError: 38
>>>
>>>
>>> raise e_unpickled
Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
ValueError: 38
.
38

Examples#

Let’s use an example to demonstrate the use of RemoteException. First, create a script with the following content:

 1# error.py
 2from mpservice.multiprocessing import MP_SPAWN_CTX, RemoteException
 3
 4def increment(qin, qout):
 5    while True:
 6        x = qin.get()
 7        if x is None:
 8            qout.put(None)
 9            return
10        try:
11            qout.put((x, x + 1))
12        except Exception as e:
13            qout.put((x, e))
14
15def main():
16    qin = MP_SPAWN_CTX.Queue()
17    qout = MP_SPAWN_CTX.Queue()
18    p = MP_SPAWN_CTX.Process(target=increment, args=(qin, qout))
19    p.start()
20    qin.put(1)
21    qin.put(3)
22    qin.put('a')
23    qin.put(5)
24    qin.put(None)
25    p.join()
26    while True:
27        y = qout.get()
28        if y is None:
29            break
30        print(y)
31
32if __name__ == '__main__':
33    main()

Run it:

$ python error.py
(1, 2)
(3, 4)
('a', TypeError('can only concatenate str (not "int") to str'))
(5, 6)

Everything is as expected. Now, instead, we want to stop the program upon errors, so we change line 30 to

if isinstance(y[1], BaseException):
    raise y[1]
print(y)

Note the line numbers at the bottom increase a bit. Now raise y[1] is on line 32, while the main() call is line 37. Run it again:

$ python error.py
(1, 2)
(3, 4)
Traceback (most recent call last):
File "error.py", line 35, in <module>
    main()
File "error.py", line 31, in main
    raise y[1]
TypeError: can only concatenate str (not "int") to str

Looks good?

Not really. The traceback says the exception happened on line 32 with raise y[1]. That’s not very useful. We get no info about where it actually happened.

What’s the problem? Well, on line 13, the TypeError object e gets put in a multiprocessing queue, pickled; on line 27, the object gets taken out of the queue, unpickled. In the process, pickle.dumps(e) strips off the attribute e.__traceback__ because traceback is not picklable!

One solution is to change line 13 to qout.put((x, RemoteException(e))). Now run it again,

$ python error.py
(1, 2)
(3, 4)
mpservice._remote_exception.RemoteTraceback: Traceback (most recent call last):
File "/home/docker-user/mpservice/tests/experiments/error.py", line 11, in increment
    qout.put((x, x + 1))
TypeError: can only concatenate str (not "int") to str


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "error.py", line 35, in <module>
    main()
File "error.py", line 31, in main
    raise y[1]
TypeError: can only concatenate str (not "int") to str

This time, we get the exact line number where the error actually happened in the child process.

If we need to pass an Exception object through multiple processes, we need to remember that an RemoteException object pickled and then unpickled gives rise to an object of the original Exception class, not of RemoteException. For any Exception object, follow this rule:

Note

Before putting any Exception object in a multiprocessing Queue, wrap it by RemoteException.

The script is revised to show this idea:

 1# error.py
 2from mpservice.multiprocessing import MP_SPAWN_CTX, RemoteException
 3
 4def increment(qin, qout):
 5    while True:
 6        x = qin.get()
 7        if x is None:
 8            qout.put(None)
 9            return
10        try:
11            qout.put((x, x + 1))
12        except Exception as e:
13            qout.put((x, RemoteException(e)))
14
15def worker(qin, qout):
16    q = MP_SPAWN_CTX.Queue()
17    p = MP_SPAWN_CTX.Process(target=increment, args=(qin, q))
18    p.start()
19    while True:
20        y = q.get()
21        if y is None:
22            qout.put(y)
23            break
24        # ... do other things ...
25        if isinstance(y[1], BaseException):
26            qout.put((y[0], RemoteException(y[1])))
27        else:
28            qout.put(y)
29    p.join()
30
31def main():
32    qin = MP_SPAWN_CTX.Queue()
33    qout = MP_SPAWN_CTX.Queue()
34    p = MP_SPAWN_CTX.Process(target=worker, args=(qin, qout))
35    p.start()
36    qin.put(1)
37    qin.put(3)
38    qin.put('a')
39    qin.put(5)
40    qin.put(None)
41    p.join()
42    while True:
43        y = qout.get()
44        if y is None:
45            break
46        if isinstance(y[1], BaseException):
47            raise y[1]
48        print(y)
49
50if __name__ == '__main__':
51    main()

Run it:

$ python error.py
(1, 2)
(3, 4)
mpservice._remote_exception.RemoteTraceback: Traceback (most recent call last):
File "/home/docker-user/mpservice/tests/experiments/error.py", line 11, in increment
    qout.put((x, x + 1))
TypeError: can only concatenate str (not "int") to str


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "error.py", line 51, in <module>
    main()
File "error.py", line 47, in main
    raise y[1]
TypeError: can only concatenate str (not "int") to str
class mpservice.multiprocessing.remote_exception.RemoteException[source]#

Bases: object

__init__(exc: BaseException, tb: TracebackType | str | None = None)[source]#
Parameters:
exc

A BaseException object.

tb

Optional traceback info, if not already carried by exc.

exc#

This is still the original Exception object with traceback and everything. When you get a RemoteException object, it must have not gone through pickling (because a RemoteException object would not survive pickling!), hence you can use its exc attribute directly.

exception mpservice.multiprocessing.remote_exception.RemoteTraceback[source]#

Bases: Exception

This class is used by RemoteException. End-user does not use this class directly.

__init__(tb: str)[source]#
mpservice.multiprocessing.remote_exception.is_remote_exception(e) bool[source]#

Return True if the exception e was created by RemoteException, and False otherwise.

mpservice.multiprocessing.remote_exception.get_remote_traceback(e) str[source]#

e must have checked True by is_remote_exception(). Suppose an Exception object is wrapped by RemoteException and sent to another process through a queue. Taken out of the queue in the other process, the object will check True by is_remote_exception(). Then this function applied on the object will return the traceback info as a string.

Server Process#

The module mpservice.multiprocessing.server_process contains some fixes and enhancements to the standard module multiprocessing.managers.

For general understanding of the standard module, see this blog. Most of the fixes are discussed in this blog. Most of the enhances are discussed in this blog.

The most visible enhancements are:

  • If a proxy method fails in the server, the same exception is raised on the client side with useful traceback.

  • Server-side proxies via function managed.

  • Support for shared memory blocks.

Basic workflow#

  1. First, register one or more classes with the ServerProcess class:

    class Doubler:
        def __init__(self, ...):
            ...
    
        def double(self, x):
            return x * 2
    
    class Tripler:
        def __init__(self, ...):
            ...
    
        def triple(self, x):
            return x * 3
    
    ServerProcess.register('Doubler', Doubler)
    ServerProcess.register('Tripler', Tripler)
    
  2. Then, start a “server process” object in a contextmanager:

    with ServerProcess() as server:
            ...
    

    By now, server has started a “server process”. We create things in the server process via methods that have been “registered” on server. Then we interact with these “remote” things via their “proxies”.

    Create instances of the registered classes within the server process:

    doubler = server.Doubler(...)
    tripler = server.Tripler(...)
    

    This causes corresponding class objects to be created in the server process; the returned objects are “proxies” for the real objects. These proxies can be passed to any other processes and used there.

    The arguments in the above calls are passed into the server process and used in the __init__ methods of the corresponding classes. For this reason, the parameters to __init__ of a registered class must all be pickle-able.

    Calling a registered class multiple times, like

    prox1 = server.Doubler(...)
    prox2 = server.Doubler(...)
    

    will create independent objects in the server process.

    Multiple ServerProcess objects would run their corresponding server processes independently.

  3. Pass the proxy objects to any process and use them there.

    By default, public methods (minus “properties”) defined by the registered classes can be invoked on a proxy and will return the expected result. For example,

    prox1.double(3)
    

    will return 6. Inputs and output of the method Doubler.double must all be pickle-able.

    The computation of the proxy’s methods happens in the server process. The proxy object only handles the communication and input/output transport.

    Between the server process and the proxy object in a particular client process or thread, a connection is established, which starts a new thread in the server process to handle all requests from that proxy object.

    Consequently, calls on a particular method of the proxy from multiple processes or threads become multi-threaded concurrent calls in the server process. We can design a simple example to observe this effect:

    class Doubler:
        def do(self, x):
            time.sleep(0.5)
            return x + x
    
    ServerProcess.register('Doubler', Doubler)
    
    def main():
        with ServerProcess() as server:
            doubler = server.Doubler()
    
            ss = Stream(range(100)).parmap(doubler.do, executor='thread', concurrency=50)
            t0 = time.perf_counter()
            zz = list(ss)
            t1 = time.perf_counter()
            print(t1 - t0)
            assert zz == [x + x for x in range(100)]
    
    if __name__ == '__main__':
        main()
    

    If the calls to doubler.do were sequential, then 100 calls would take 50 seconds. With concurrent calls in 50 threads as above, it took 1.05 seconds in an experiment.

    It follows that, if the method mutates some shared state, you may need to use locks to guard things.

Shared memory#

“Shared memory”, introduced in Python 3.8, provides a way to use a block of memory across processes with zero-copy. ServerProcess has a method MemoryBlock() that returns a MemoryBlockProxy object:

with ServerProcess() as server:
    mem = server.MemoryBlock(1000)
    buffer = mem.buf  # memoryview
    # ... write data into `buffer`
    # pass `mem` to other processes and use its `.buf` again for the content.
    # Since it's a "shared" block of memory, any process can modify the data
    # via the memoryview.

Usually you will use the three methods name(), size(), and buf(), of MemoryBlockProxy. The property buf returns a memoryview of the shared memory.

The block of shared memory is released/destroyed once all references to the MemoryBlockProxy object have been garbage collected, or when the server process terminates.

Nested proxies#

The standard Manager allows “nested proxies” in a client process (not the “server process”). For example,

with ServerProcess() as server:
    lst = server.list()
    dct = server.dict()
    dct['a'] = 3
    dct['b'] = 4
    lst.append(dct)

Now, the first element of lst is a dict proxy. You may pass lst to another process and manipulate its first element; the changes are reflected in the dict that resides in the server process.

Server-side proxies#

By default, when you call a method of a proxy, the returned value is plain old data passed from the server process to the caller in the client process via pickling. Once received, that value has no relation to the server process.

The standard Manager allows a method of a registered class to return a proxy by instructions via the parameter method_to_typeid. However, the set up can be inconvenient, and the capability is limited.

ServerProcess supports returning a mix of proxy and non-proxy data elements in convenient and powerful ways. Suppose we have a class Worker, which we will run in a server process; further, a method of Worker will return a dict that are plain data in some parts and proxies in some others. We can do something like this:

from mpservice.multiprocessing.server_process import MemoryBlock, ServerProcess, managed_memoryblock, managed_list

class Worker:
    def make_data(self):
        return {
            'name': 'tom',
            'hobbies': ['soccor', 'swim'],
            'mem': managed_memoryblock(MemoryBlock(100)),
            'jobs': ('IBM', 'APPLE', {'UK': managed_list(['ARM', 'Shell'])}),
        }

    def get_memory(self, size):
        return managed_memoryblock(MemoryBlock(size))

ServerProcess.register('Worker', Worker)

Then we may use it like this:

from mpservice.multiprocessing import Process

def agent(data):
    assert data['name'] == 'tom'
    data['hobbies'].append('tennis')
    data['mem'].buf[3] = 8
    data['jobs][2]['UK'].append('BP')

with ServerProcess() as server_process:
    worker = server_process.Worker()
    data = worker.make_data()
    p = Process(target=agent, args=(data, ))
    p.start()
    p.join()

    # data['hobbis'] is not hosted; it's modification in one process
    # will not be reflected in other processes.
    assert len(data['hobbies]) == 2

    assert data['mem'].buf[3] == 8
    assert data['jobs'][2]['UK'][2] == 'BP'

    mem = worker.get_memory(80)
    assert isinstance(mem, MomeryBlockProxy)
    assert mem.size == 80
class mpservice.multiprocessing.server_process.ServerProcess[source]#

Bases: BaseManager

__init__(address=None, authkey=None, serializer='pickle', ctx=None, *, name: str | None = None, cpu: int | list[int] | None = None, **kwargs)[source]#
Parameters:
name

Name of the server process. If None, a default name will be created.

cpu

Specify CPU pinning for the server process.

start(*args, **kwargs)[source]#
classmethod register(typeid, callable=None, proxytype=None, *, method_to_typeid=None, create_method=True)[source]#
classmethod unregister(typeid)[source]#
Array(*args, **kwds)#
MemoryBlock(*args, **kwds)#
Namespace(*args, **kwds)#
Value(*args, **kwds)#
dict(*args, **kwds)#
list(*args, **kwds)#
class mpservice.multiprocessing.server_process.MemoryBlock[source]#

Bases: object

This class provides a simple tracker for a block of shared memory.

User is expected to use this class ONLY within a server process. They should wrap this object with managed_memoryblock.

Outside of a server process, use manager.MemoryBlock.

__init__(size: int)[source]#
property name#
property size#
property buf#
class mpservice.multiprocessing.server_process.MemoryBlockProxy[source]#

Bases: BaseProxy

__init__(*args, name: str | None = None, size: int | None = None, **kwargs)[source]#
property name#

Return the name of the SharedMemory object.

property size: int#

Return size of the memory block in bytes.

property buf: memoryview#

Return a memoryview into the context of the memory block.

mpservice.multiprocessing.server_process.managed(obj, *, typeid: str | None = None)[source]#

This function is used within a server process. It creates a proxy for the input obj. The resultant proxy can be sent back to a requester outside of the server process. The proxy may also be saved in another data structure (either residing in the server or sent back to the client), e.g. as an element in a managed list or dict.

Runner#