Source code for mpservice.multiprocessing.context

from __future__ import annotations

import concurrent.futures
import errno
import logging
import logging.handlers
import multiprocessing
import multiprocessing.connection
import multiprocessing.context
import multiprocessing.managers
import multiprocessing.pool
import multiprocessing.queues
import multiprocessing.synchronize
import multiprocessing.util
import os
import sys
import time
import traceback

from .._common import TimeoutError
from ..threading import Thread
from .remote_exception import RemoteException

logger = logging.getLogger(__name__)


[docs] class SpawnProcess(multiprocessing.context.SpawnProcess): """ A subclass of the standard ``multiprocessing.context.SpawnProcess``, this customization adds two things: 1. Make result and exception available as attributes of the process object, hence letting you use a ``SpawnProcess`` object similarly to how you use the ``Future`` object returned by `concurrent.futures.ProcessPoolExecutor.submit <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.submit>`_. 2. Make logs in the worker process handled in the main process. Logging messages produced in worker processes are tricky. First, some settings should be concerned in the main process only, including log formatting, log-level control, log handler (destination), etc. Specifically, these should be settled in the "launching script", and definitely should not be concerned in worker processes. Second, the terminal printout of loggings in multiple processes tends to be intermingled and mis-ordered. This class uses a queue to transmit all logging messages that are produced in the worker process to the main process/thread, to be handled there. This class is aliased by ``mpservice.multiprocessing.Process``. It is preferred to import by the aliased name. Examples -------- Let's use an example to show the logging behavior. First, use a spawn-context from the standard `multiprocessing`_: .. code-block:: python :linenos: # log.py import logging import multiprocessing as mp from mpservice.multiprocessing import SpawnProcess def worker(): logging.getLogger('worker.error').error('worker error') logging.getLogger('worker.warn').warning('worker warning') logging.getLogger('worker.info').info('worker info') logging.getLogger('worker.debug').debug('worker debug') def main(): logging.getLogger('main.error').error('main error') logging.getLogger('main.info').info('main info') p = mp.get_context('spawn').Process(target=worker) p.start() p.join() logging.getLogger('main.warn').warning('main warning') logging.getLogger('main.debug').debug('main debug') if __name__ == '__main__': logging.basicConfig( format='[%(asctime)s.%(msecs)02d; %(levelname)s; %(name)s; %(funcName)s, %(lineno)d] [%(processName)s] %(message)s', level=logging.DEBUG, ) main() Run it:: $ python log.py [2022-12-20 17:29:54,386.386; ERROR; main.error; main, 15] [MainProcess] main error [2022-12-20 17:29:54,386.386; INFO; main.info; main, 16] [MainProcess] main info worker error worker warning [2022-12-20 17:29:54,422.422; WARNING; main.warn; main, 20] [MainProcess] main warning [2022-12-20 17:29:54,423.423; DEBUG; main.debug; main, 21] [MainProcess] main debug Clearly, the child process exhibits the default behavior---print the warning-and-above-level log messages to the console---unaware of the logging configuration set in the main process. **This is a show stopper.** On line 15, replace ``mp.get_context('spawn').Process`` by ``SpawnProcess``. Run it again:: $ python log.py [2022-12-20 17:39:31,284.284; ERROR; main.error; main, 15] [MainProcess] main error [2022-12-20 17:39:31,284.284; INFO; main.info; main, 16] [MainProcess] main info [2022-12-20 17:39:31,321.321; ERROR; worker.error; worker, 8] [SpawnProcess-1] worker error [2022-12-20 17:39:31,321.321; WARNING; worker.warn; worker, 9] [SpawnProcess-1] worker warning [2022-12-20 17:39:31,321.321; INFO; worker.info; worker, 10] [SpawnProcess-1] worker info [2022-12-20 17:39:31,322.322; DEBUG; worker.debug; worker, 11] [SpawnProcess-1] worker debug [2022-12-20 17:39:31,327.327; WARNING; main.warn; main, 20] [MainProcess] main warning [2022-12-20 17:39:31,327.327; DEBUG; main.debug; main, 21] [MainProcess] main debug This time, logs in the child process respect the level and format configurations set in the main process (because they are sent to and handled in the main process). """
[docs] def __init__(self, *args, kwargs=None, **moreargs): """ Parameters ---------- *args Positional arguments passed on to the standard ``Process``. kwargs Passed on to the standard ``Process``. **moreargs Additional keyword arguments passed on to the standard ``Process``. """ if kwargs is None: kwargs = {} else: kwargs = dict(kwargs) assert '_result_and_error_' not in kwargs assert '_logger_queue_' not in kwargs reader, writer = multiprocessing.connection.Pipe(duplex=False) kwargs['_result_and_error_'] = writer logger_queue = MP_SPAWN_CTX.Queue() kwargs['_logger_queue_'] = logger_queue super().__init__(*args, kwargs=kwargs, **moreargs) assert not hasattr(self, '_result_and_error_') assert not hasattr(self, '_logger_queue_') assert not hasattr(self, '_logger_thread_') assert not hasattr(self, '_collector_thread_') self._result_and_error_ = reader self._logger_queue_ = logger_queue self._logger_thread_ = None
[docs] def start(self): super().start() # The following must be *after* ``super().start``, otherwise will get error # "cannot pickle '_thread.Lock' object" or "cannot pickle '_thread.RLock" because `self` # will be passed to the other process in ``super().start()``, # going through pickling. self._future_ = concurrent.futures.Future() self._logger_thread_ = Thread( target=self._run_logger, args=(self._logger_queue_,), name=f'{self.name}-LoggerThread', daemon=getattr(self, 'daemon', None), ) self._logger_thread_.start() self._result_collector_thread_ = Thread( target=self._collect_result, name=f'{self.name}-ResultCollectorThread', ) self._result_collector_thread_.start() self._finalizer_ = multiprocessing.util.Finalize( self, type(self)._finalize, args=(self._logger_thread_, self._logger_queue_), )
@staticmethod def _run_logger(q: multiprocessing.queues.Queue): while True: record = q.get() if record is None: break logger = logging.getLogger(record.name) if record.levelno >= logger.getEffectiveLevel(): logger.handle(record) def _collect_result(self): result, error = None, None try: result = self._result_and_error_.recv() error = self._result_and_error_.recv() except EOFError as exc: # the process has been terminated by calling ``self.terminate()`` while self.exitcode is None: time.sleep(0.001) exitcode = -self.exitcode if exitcode == errno.ENOTBLK: # 15 # warnings.warn( # f'process exitcode {exitcode}, {errno.errorcode[exitcode]}; likely due to a forced termination by calling `.terminate()`', # stacklevel=2, # ) pass # For example, ``self.terminate()`` was called. That's a code smell. # ``signal.Signals.SIGTERM`` is 15. # ``signal.Signals.SIGKILL`` is 9. # ``signal.Signals.SIGINT`` is 2. else: msg = os.strerror(exitcode) if exitcode == 9: msg += ': possibly out of memory' raise OSError(exitcode, msg) from exc self._logger_queue_.put(None) self._result_and_error_.close() self._result_and_error_ = None if error is not None: self._future_.set_exception(error) else: self._future_.set_result(result) @staticmethod def _finalize(logger_thread, q): q.put(None) logger_thread.join()
[docs] @staticmethod def handle_exception(exc): # Subclass can customize this to log more info. # however, usually you should do that when calling `.join`. # This method must not raise exceptions. print(f'Exception in {multiprocessing.current_process().name}: {repr(exc)}')
[docs] def run(self): """ Overrides the standard ``Process.run``. ``start`` arranges for this to be run in a child process. """ result_and_error = self._kwargs.pop('_result_and_error_') # Upon completion, `result_and_error` will contain `result` and `exception` # in this order; both may be `None`. if not self._target: result_and_error.send(None) result_and_error.send(None) result_and_error.close() return logger_queue = self._kwargs.pop('_logger_queue_') if not logging.getLogger().hasHandlers(): # Set up putting all log messages # ever produced in this process into ``logger_queue``, # The log messages will be consumed # in the main process by ``self._run_logger_thread``. # # During the execution of this process, logging should not be configured. # Logging config should happen in the main process/thread. root = logging.getLogger() root.setLevel(logging.DEBUG) qh = logging.handlers.QueueHandler(logger_queue) root.addHandler(qh) logging.captureWarnings(True) else: # If logger is configured in this process, then do not start log forwarding, # but this is usually not recommended. # This sually happends because logging is configured on the module level rather than # in the ``if __name__ == '__main__':`` block. logger_queue.close() logger_queue = None qh = None self._mpservice_exitcode_ = 0 try: z = self._target(*self._args, **self._kwargs) except SystemExit as e: if e.code is None: result_and_error.send(None) result_and_error.send(None) else: if isinstance(e.code, int): if e.code == 0: result_and_error.send(None) result_and_error.send(None) else: self._mpservice_exitcode_ = e.code self.handle_exception(e) result_and_error.send(None) result_and_error.send(RemoteException(e)) else: self._mpservice_exitcode_ = 1 self.handle_exception(e) sys.stderr.write(f'exitcode: {e.code}' + '\n') result_and_error.send(None) result_and_error.send(RemoteException(e)) except BaseException as e: self.handle_exception(e) # This must go before the two lines below, in case # user's custom `handle_exception` writes logs. self._mpservice_exitcode_ = 1 if self.daemon: traceback.print_exc() result_and_error.send(None) result_and_error.send(RemoteException(e)) else: result_and_error.send(z) result_and_error.send(None) finally: result_and_error.close() if qh is not None: logging.getLogger().removeHandler(qh) logger_queue.close()
def _bootstrap(self, parent_sentinel=None): exitcode = super()._bootstrap(parent_sentinel) assert exitcode == 0 return self._mpservice_exitcode_
[docs] def join(self, timeout=None): """ Same behavior as the standard lib, except that if the process terminates with an exception, the exception is raised. """ super().join(timeout=timeout) if not self.done(): # timed out return self._result_collector_thread_.join() # exitcode = self.exitcode # if exitcode is None: # # Not terminated. Timed out. # return # if exitcode == 0: # # Terminated w/o error. # return # if exitcode == 1: # raise self._future_.exception() # if exitcode >= 0: # raise ValueError(f'expecting negative `exitcode` but got {exitcode}') if self._future_.exception(): raise self._future_.exception()
# else: # raise ChildProcessError( # f'child process failed with exitcode {exitcode}, {errno.errorcode[exitcode]}' # ) # For a little more info on the error codes, see # https://www.gnu.org/software/libc/manual/html_node/Error-Codes.html
[docs] def done(self) -> bool: """ Return ``True`` if the process has terminated normally or with exception. Return ``False`` if the process is running or not yet started. """ return self.exitcode is not None
[docs] def result(self, timeout: float | int | None = None): """ Behavior is similar to ``concurrent.futures.Future.result``. """ self.join(timeout) if not self.done(): raise TimeoutError return self._future_.result()
[docs] def exception(self, timeout: float | int | None = None): """ Behavior is similar to ``concurrent.futures.Future.exception``. """ super().join(timeout) if not self.done(): raise TimeoutError self._result_collector_thread_.join() return self._future_.exception()
[docs] class SpawnContext(multiprocessing.context.SpawnContext): """ The standard package ``multiprocessing`` has a `"context" <https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods>`_, which has to do with how a process is created and started. For example, a ``ForkContext`` (or ``SpawnContext``) will create processes by ``ForkProcess`` (or ``SpawnProcess``). To use a queue to communicate with the process, the queue needs to be created via ``ForkContext.Queue`` (or ``SpawnContext.Queue``), else it will not work. If you do :: import multiprocessing q = multiprocessing.Queue() this is equivalent to :: q = multiprocessing.get_context().Queue() `multiprocessing.get_context <https://docs.python.org/3/library/multiprocessing.html#multiprocessing.get_context>`_ takes the sole parameter ``method``, which **on Linux** defaults to ``'fork'``. However, it is advised to **not** use this default; rather, **always** use the "spawn" context. There are some references on this topic; for example, see `this article <https://pythonspeed.com/articles/python-multiprocessing/>`_ and `this StackOverflow thread <https://stackoverflow.com/questions/64095876/multiprocessing-fork-vs-spawn>`_. So, multiprocessing code is better written this way:: import multiprocessing ctx = multiprocessing.get_context('spawn') q = ctx.Queue(...) e = ctx.Event(...) p = ctx.Process(..., args=(q, e)) ... where ``ctx`` is an instance of ``multiprocessing.context.SpawnContext``. ``mpservice.multiprocessing.SpawnContext`` inherits from the standard ``SpawnContext`` and provides some enhancements. (The main enhancement is to use the custom :class:`mpservice.multiprocessing.SpawnProcess` for process creation.) The constant ``mpservice.multiprocessing.MP_SPAWN_CTX`` points to an instance of this class. With these facilities, the code above can be replaced by this:: from mpservice.multiprocessing import MP_SPAWN_CTX as ctx q = ctx.Queue(...) e = ctx.Event(...) p = ctx.Process(..., args=(q, e)) ... However, there is an annoyance here: ``ctx.Queue``, ``ctx.Event``, and some others are not **classes**, but rather **factory methods**. As a result, they can not be used to annotate the type of the objects created by them. (Similarly, if you do ``from multiprocessing import Queue, Event``, you are getting factory methods, not classes. The actual classes, ``multiprocessing.queues.Queue`` and ``multiprocessing.synchronize.Event``, have a **required** parameter ``ctx``, hence are somewhat inconvenient to use.) The module ``mpservice.multiprocessing`` breaks from the standard ``multiprocessing`` API design to alleviate this problem. It provides custom classes :class:`Queue`, :class:`Event`, and some others, that have an **optional** rather than required parameter ``ctx``, which defaults to, you guessed it, ``mpservice.multiprocessing.MP_SPAWN_CTX``. With this, the code above is better yet written this way:: from mpservice.multiprocessing import Queue, Event, Process q: Queue = Queue(...) e: Event = Event(...) p: Process = Process(..., args=(q, e)) .. note:: Recommendations on the use of ``MP_SPAWN_CTX``: use the classes ``Process``, ``Manager``, ``Lock``, ``RLock``, ``Condition``, ``Semaphore``, ``BoundedSemaphore``, ``Event``, ``Barrier``, ``Queue``, ``JoinableQueue``, ``SimpleQueue``, ``Pool`` diretly to create objects and type-annote them; this is preferred over ``MP_SPAWN_CTX.Process``, ``MP_SPAWN_CTX.Manager``, etc, although they would work, too. A few other methods of ``SpawnContext`` are not exposed in ``mpservice.multiprocessing``; you may use them via the object ``MP_SPAWN_CTX``. """ Process = SpawnProcess
[docs] def get_context(self, method=None): if method is None or method == 'spawn': return self return super().get_context(method)
# MP_SPAWN_CTX = multiprocessing.context.DefaultContext(SpawnContext()) # The version above would fail `tests/test_streamer.py::test_parmap`. I don't know why. MP_SPAWN_CTX = SpawnContext()