Source code for mpservice.threading
from __future__ import annotations
__all__ = [
'InvalidStateError',
'Thread',
'FIRST_COMPLETED',
'FIRST_EXCEPTION',
'ALL_COMPLETED',
'wait',
'as_completed',
]
import concurrent.futures
import ctypes
import threading
import traceback
from collections.abc import Iterator, Sequence
from concurrent.futures import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
from typing import Type
# Overhead of Thread:
# sequentially creating/running/joining
# threads with a trivial worker function:
# 20000 took 1 sec.
# https://stackoverflow.com/questions/36484151/throw-an-exception-into-another-thread
from .._common import TimeoutError
class InvalidStateError(RuntimeError):
pass
[docs]
class Thread(threading.Thread):
"""
A subclass of the standard ``threading.Thread``,
this class makes the result or exception produced in a thread
accessible from the thread object itself. This makes the ``Thread``
object's behavior similar to the ``Future`` object returned
by ``concurrent.futures.ThreadPoolExecutor.submit``.
.. seealso:: :class:`mpservice.multiprocessing.SpawnProcess`
"""
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._future_: concurrent.futures.Future = None
[docs]
@staticmethod
def handle_exception(exc):
# Subclass can customize this to log more info,
# however, usually you should do that when calling `.join`.
# This method should not raise exceptions.
print(f'Exception in {threading.current_thread().name}: {repr(exc)}')
[docs]
def run(self):
"""
This method represents the thread's activity.
"""
self._future_ = concurrent.futures.Future()
try:
if self._target is not None:
z = self._target(*self._args, **self._kwargs)
self._future_.set_result(z)
else:
self._future_.set_result(None)
except SystemExit as e:
# TODO: what if the code has created other threads?
if e.code is None:
self._future_.set_result(None)
else:
if isinstance(e.code, int):
if e.code == 0:
self._future_.set_result(None)
else:
self.handle_exception(e)
self._future_.set_exception(e)
else:
self.handle_exception(e)
self._future_.set_exception(e)
except BaseException as e:
self.handle_exception(e)
tb = ''.join(traceback.format_exception(type(e), e, e.__traceback__))
tb = f'[{threading.current_thread().name}] ' + tb
e.__cause__ = type(e)(tb)
e.__traceback__ = None
self._future_.set_exception(e)
# Sometimes somehow error is not visible (maybe it's a `pytest` issue?).
if self._daemonic:
traceback.print_exc()
# In non-daemonic thread, let user control printing of exception
# when they call `join`.
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs
[docs]
def join(self, timeout=None):
"""
Same behavior as the standard lib, except that if the thread
terminates with an exception, the exception is raised.
"""
super().join(timeout=timeout)
if self.is_alive():
# Timed out
return
if self._future_.exception():
raise self._future_.exception()
[docs]
def done(self) -> bool:
"""
Return ``True`` if the thread has terminated.
Return ``False`` if the thread is running or not yet started.
"""
if self.is_alive():
return False
return self._started.is_set()
# Otherwise, not started yet.
[docs]
def result(self, timeout=None):
"""
Behavior is similar to ``concurrent.futures.Future.result``.
"""
super().join(timeout)
if self.is_alive():
raise TimeoutError
return self._future_.result()
[docs]
def exception(self, timeout=None):
"""
Behavior is similar to ``concurrent.futures.Future.exception``.
"""
super().join(timeout)
if self.is_alive():
raise TimeoutError
return self._future_.exception()
[docs]
def throw(self, exc: BaseException | Type[BaseException]) -> None:
"""
Raise exception ``exc`` inside the thread.
Do not use this unless you know what you're doing.
A main intended user is :meth:`terminate`.
"""
# References:
# https://gist.github.com/liuw/2407154
# https://docs.python.org/3/c-api/init.html#c.PyThreadState_SetAsyncExc
# https://stackoverflow.com/a/65090035/6178706
if not self.is_alive():
raise InvalidStateError('The thread is not running')
tid = self.ident
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_ulong(tid), ctypes.py_object(exc)
)
if ret == 0:
raise ValueError(f'Invalid thread ID {tid}')
elif ret > 1:
# Huh? Why would we notify more than one threads?
# Because we punch a hole into C level interpreter.
# So it is better to clean up the mess.
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
raise SystemError('PyThreadState_SetAsyncExc failed')
[docs]
def terminate(self) -> None:
"""
There are many pitfalls in terminating a thread from outside.
Use extra caution if the thread code is complex, e.g. if it creates
more threads.
"""
# There's a chance that the exception is missed in the thread.
# I read about it but now can't find the reference.
while self.is_alive():
self.throw(SystemExit)
super().join(0.01)
[docs]
def wait(
threads: Sequence[Thread], timeout=None, return_when=ALL_COMPLETED
) -> tuple[set[Thread], set[Thread]]:
"""See ``concurrent.futures.wait``."""
futures = [t._future_ for t in threads]
future_to_thread = {id(t._future_): t for t in threads}
done, not_done = concurrent.futures.wait(
futures, timeout=timeout, return_when=return_when.upper()
)
if done:
done = set(future_to_thread[id(f)] for f in done)
if not_done:
not_done = set(future_to_thread[id(f)] for f in not_done)
return done, not_done
[docs]
def as_completed(threads: Sequence[Thread], timeout=None) -> Iterator[Thread]:
"""See ``concurrent.futures.as_completed``."""
futures = [t._future_ for t in threads]
future_to_thread = {id(t._future_): t for t in threads}
for f in concurrent.futures.as_completed(futures, timeout=timeout):
yield future_to_thread[id(f)]