Source code for mpservice.multiprocessing.queues

import multiprocessing
from typing import Generic, TypeVar

from .context import MP_SPAWN_CTX

Elem = TypeVar('Elem')


[docs] class Queue(multiprocessing.queues.Queue, Generic[Elem]):
[docs] def __init__(self, maxsize=0, *, ctx=None): super().__init__(maxsize, ctx=ctx or MP_SPAWN_CTX)
@property def maxsize(self): # `queue.Queue` has attribute `maxsize`. # `multiprocessing.queues.Queue` has attribute `_maxsize`. return self._maxsize
[docs] class JoinableQueue(multiprocessing.queues.JoinableQueue, Generic[Elem]):
[docs] def __init__(self, maxsize=0, *, ctx=None): super().__init__(maxsize, ctx=ctx or MP_SPAWN_CTX)
[docs] class SimpleQueue(multiprocessing.queues.SimpleQueue, Generic[Elem]):
[docs] def __init__(self, *, ctx=None): super().__init__(ctx=ctx or MP_SPAWN_CTX)