Serving mpserver#
Serving using http#
The module mpservice.http provides simple utilities for serving mpservice.mpserver.AsyncServer over HTTP
using uvicorn and starlette.
This utility code is not directly connected to AsyncServer, because AsyncServer simply
provides the method call() that can be called from an HTTP
request handler function; AsyncServer itself has nothing to do with HTTP.
The example below shows one way to wire things up:
# "example.py"
import asyncio
import contextlib
import os
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import PlainTextResponse, JSONResponse
from mpservice.mpserver import AsyncServer
from mpservice.http import start_server, stop_server
async def handle_request(request):
...
...
result = await request.state.model.call(...)
...
return JSONResponse(...)
async def stop(request):
await stop_server()
return PlainTextResponse("server shutdown as requested", status_code=200)
@contextlib.asynccontextmanager
async def lifespan(app):
async with AsyncServer(...) as model:
yield {'model': model} # available in endpoint functions via `request.state`
app = Starlette(lifespan=lifespan,
routes=[
Route('/', handle_request),
Route('/stop', stop, methods=['POST']),
])
if __name__ == '__main__':
start_server('example:app')
As demonstrated, you can set up (async) context managers and other things
in lifespan and make things available via the lifespan’s “state” as needed.
The app received by lifespan has an attribute worker_context
(which is added by our customization) that can be used to pass config-style
data from the main process (where start_server is called) to the worker processes.
The use of starlette is very lightweight: it just handles HTTP
routing and request acceptance/response.
- mpservice.http.start_server(app: ASGIApplication | str, *, host: str = '0.0.0.0', port: int = 8000, access_log: bool = False, backlog: int = 128, workers: int = 1, worker_contexts: list | None = None, log_config=None, server_id: str = '0', **kwargs)[source]#
This function is intended for launching a service that uses
mpservice.mpserver.This function is adapted from
uvicorn.main.run.- Parameters:
- app
Usually starlette.applications.Starlette instance or the import string for such an instance, like
'mypackage.mymodule:app'. The module as named, ‘mypackage.mymodule’, must be able to be imported. If app is defined in a script (say example.py) that is in the current working directory, that is, if you typepython example.py
in the directory that contains example.py, then Python will be able to import that script. In that case, app can be written as ‘example:app’.
If workers > 1, this must be a str, because a Starlette object cannot be pickled and sent to other processes.
- host
The default
'0.0.0.0'is suitable if the code runs within a Docker container.- port
Port.
- access_log
Whether to let
uvicornemit access logs.- backlog
The default should be adequate. Don’t make this large unless you know what you’re doing.
uvicorn.Server.startup passes this to AsyncIO
loop.create_server(in asyncio.base_events, where default is 100), and in turn to socket.socket.listen.uvicornuses a large default value (2048). That might be misguided.See: https://bugs.python.org/issue38699, https://stackoverflow.com/a/2444491, https://stackoverflow.com/a/2444483,
- workers
If
1, the server is launched in the current process/thread. If> 1, this many processes are spawned, each running one copy ofappindependently.Note that worker processes are not dynamically created and killed according to need—this fixed number of processes are started and they remain active—this is in contrast to Gunicorn. This also makes the parameter
worker_contextsmeaningful.Since
mpservice.mpserverhandles sophisticated multiprocessing servers “natively”, usually you should useworkers=1, that is, letuvicornrun a simple “in-process” worker (implemented by ampservice.mpserver.AsyncServerobject) and letAsyncServerhandle multiple worker processes.There may be cases where
workers > 1has some speed advantage. One possible reason for such advantage might come from saving on pickling overhead. You need to bencharmk you particular use case to decide.- worker_contexts
If provided, this must be a list or tuple with as many as
workerselements. The elements ofworker_contextswill become the attributeworker_contextofappin the corresponding worker process (or the current process ifworkers=1).This is mainly designed for passing config-style data to worker processes when
workers > 1. Theappobject is available to the “lifespan” of aStarletteobject; user can obtainapp.worker_contextthere and decide how to use it to initialize things, save it, as well as make it available to endpoint functions via the lifespan’s “state”. See tests for example usage.- server_id
If you run multiple services in the same program, you need to use server_id to distinguish them and provide the ID to both start_server and stop_server.
However, that use case seems extrememly unlikely.
- **kwargs
Passed to
uvicorn.Config.
Serving using named pipes#
The module mpservice.pipe provides tools to use a “named pipe” to communicate between
two Python processes on the same machine.
Usually the two Python processes are two separately started programs. If they are two processes created by multiprocessing in a single program, then you would directly use multiprocessing.Pipe instead of this module.
To start, create a Server object in one process and a Client object in
the other process, providing the same path argument.
The two objects can be created in any order, and their roles are symmetric.
The different names simply remind the user to create one of each
in the two processes.
Two uni-directional pipes are created between the two processes, represented
by the files named f"{path}.1" and f"{path}.2".
In each process, you send() to one pipe and recv() from the other pipe.
The send and recv functions take and return picklable Python objects.
While send does not block as long as system buffer has space,
recv blocks until one data item is read.
It’s up to the application to design handshaking values understood by both sides.
The roles of the two pipes in the two processes are flipped; this role assignment is take care of internally. To prevent glitches, make sure the two files are non-existent before server and client are created.
- class mpservice.pipe._Pipe[source]#
Bases:
objectSee multiprocessing.connection.Connection for documentation on the methods
send(),recv(),send_bytes(),recv_bytes(),recv_bytes_into().
Serving using sockets#
The module mpservice.socket provides tools to use sockets to communicate between
two Python processes on the same machine.
- class mpservice.socket.SocketApplication[source]#
Bases:
objectSocketApplicationis designed to to used similar to the “application” in a HTTP framework. The main API is to register “endpoint” functions by the methodadd_route(). This allows to back the socket service by multiple functions for different purposes. Usually there is only one main function, which involves transmitting substantial amount of data between the server and the client. For simplicity, one may use'/'for thepathof this route. The other routes are usually supportive, for example, getting server info or setting options. For example:app.add_route('/', make_prediction) app.add_route('/server-info', get_server_info) app.add_route('/set-option', set_server_option)
This class is the intended interface between a socket server and a particular application (functions). Usually, user should not customize the class
SocketServer.- add_route(path: str, route: Callable[[Any], Awaitable[Any]] | Callable[[], Awaitable[Any]])[source]#
routeis an async function that takes a single positional arg, and returns a response (which could beNoneif so desired). The response should be serializable by the encoder. To be safe, return a object of Python native types. If exception is raised in this method, appropriateRemoteExceptionobject will be sent in the response. The method could also proactively return aRemoteExceptionobject.pathis any string. The route is identified by this string. For familiarity, it may be a good idea to start the string with'/', although this is in no way necessary.There is no GET/POST distinction like in the case of HTTP.
- class mpservice.socket.SocketServer[source]#
Bases:
object- __init__(app: SocketApplication, *, path: str | None = None, host: str | None = None, port: int | None = None, backlog: int | None = None, shutdown_path: str = '/shutdown')[source]#
backlogis the max concurrent in-progress requests per connection. (Note, the client may open many connections.) This “concurrency” is in terms of concurrent calls toSocketApplication.handle_request().The type of the service, between ‘tcp’ and ‘unix’, is determined by the parameters
path,host, andport. See code for details.
- mpservice.socket.make_server(app: SocketApplication, **kwargs)[source]#
Example:
async def double(data): await asyncio.sleep(0.01) return data * 2 app = SocketApplication() app.add_route('/', double) server = make_server(app, path='/tmp/sock_abc') asyncio.run(server.serve())
- class mpservice.socket.SocketClient[source]#
Bases:
object- __init__(*, path: str | None = None, host: str | None = None, port: int | None = None, num_connections: int | None = None, connection_timeout: int = 60, backlog: int = 2048)[source]#
- Parameters:
- path, host, port
Either
pathis given (for Unix socket), orport(plus optionallyhost) is given (for Tcp socket). These values should, of course, be consistent with the corresponding server.- num_connections
This is expected to have a direct impact on the performance, hence needs experimentation.
- connection_timeout
How many seconds to wait while connecting to the server. This is meant for waiting for server to be ready, rather than for the action of “connecting” itself (which should be fast).
- backlog
Size of the queue for in-progress requests.
- request(path: str, data=None, *, enqueue_timeout=None, response_timeout=None)[source]#
This could raise
concurrent.futures.TimeoutError. That means result is not available in the specified time, but the request may have well been sent to the server. However the user handles the exception, it will not affect the server’s response to the request. The user will not be able to resume the wait for the result.If caller does not need the response, use
response_timeout=0.In some cases, the request does not need to send data, e.g. if the request if for certain info query. In such situations, the corresponding function on the server side takes no argument, and in this call to
request,datashould beNone.Example:
request('/shutdown', response_timeout=0)
- stream(path: str, data: Iterable, *, return_x: bool = False, return_exceptions: bool = False, enqueue_timeout=60, response_timeout=60)[source]#
If
return_xisTrue, return a stream of(x, y)tuples, wherexis the input data, andyis a dict with element ‘data’. Ifreturn_xisFalse, return a stream ofy. Ifreturn_exceptionsisTrue,ycould be an Exception object.