Source code for mpservice.multiprocessing.queues
import multiprocessing
from typing import Generic, TypeVar
from .context import MP_SPAWN_CTX
Elem = TypeVar('Elem')
[docs]
class Queue(multiprocessing.queues.Queue, Generic[Elem]):
[docs]
def __init__(self, maxsize=0, *, ctx=None):
super().__init__(maxsize, ctx=ctx or MP_SPAWN_CTX)
@property
def maxsize(self):
# `queue.Queue` has attribute `maxsize`.
# `multiprocessing.queues.Queue` has attribute `_maxsize`.
return self._maxsize
[docs]
class JoinableQueue(multiprocessing.queues.JoinableQueue, Generic[Elem]):
[docs]
def __init__(self, maxsize=0, *, ctx=None):
super().__init__(maxsize, ctx=ctx or MP_SPAWN_CTX)
[docs]
class SimpleQueue(multiprocessing.queues.SimpleQueue, Generic[Elem]):
[docs]
def __init__(self, *, ctx=None):
super().__init__(ctx=ctx or MP_SPAWN_CTX)