Serving mpserver#

Serving using http#

The module mpservice.http provides simple utilities for serving mpservice.mpserver.AsyncServer over HTTP using uvicorn and starlette.

This utility code is not directly connected to AsyncServer, because AsyncServer simply provides the method call() that can be called from an HTTP request handler function; AsyncServer itself has nothing to do with HTTP. The example below shows one way to wire things up:

# "example.py"

import asyncio
import contextlib
import os
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import PlainTextResponse, JSONResponse
from mpservice.mpserver import AsyncServer
from mpservice.http import start_server, stop_server


async def handle_request(request):
    ...
    ...
    result = await request.state.model.call(...)
    ...
    return JSONResponse(...)


async def stop(request):
    await stop_server()
    return PlainTextResponse("server shutdown as requested", status_code=200)


@contextlib.asynccontextmanager
async def lifespan(app):
    async with AsyncServer(...) as model:
        yield {'model': model}  # available in endpoint functions via `request.state`


app = Starlette(lifespan=lifespan,
                routes=[
                    Route('/', handle_request),
                    Route('/stop', stop, methods=['POST']),
                ])

if __name__ == '__main__':
    start_server('example:app')

As demonstrated, you can set up (async) context managers and other things in lifespan and make things available via the lifespan’s “state” as needed. The app received by lifespan has an attribute worker_context (which is added by our customization) that can be used to pass config-style data from the main process (where start_server is called) to the worker processes.

The use of starlette is very lightweight: it just handles HTTP routing and request acceptance/response.

mpservice.http.start_server(app: ASGIApplication | str, *, host: str = '0.0.0.0', port: int = 8000, access_log: bool = False, backlog: int = 128, workers: int = 1, worker_contexts: list | None = None, log_config=None, server_id: str = '0', **kwargs)[source]#

This function is intended for launching a service that uses mpservice.mpserver.

This function is adapted from uvicorn.main.run.

Parameters:
app

Usually starlette.applications.Starlette instance or the import string for such an instance, like 'mypackage.mymodule:app'. The module as named, ‘mypackage.mymodule’, must be able to be imported. If app is defined in a script (say example.py) that is in the current working directory, that is, if you type

python example.py

in the directory that contains example.py, then Python will be able to import that script. In that case, app can be written as ‘example:app’.

If workers > 1, this must be a str, because a Starlette object cannot be pickled and sent to other processes.

host

The default '0.0.0.0' is suitable if the code runs within a Docker container.

port

Port.

access_log

Whether to let uvicorn emit access logs.

backlog

The default should be adequate. Don’t make this large unless you know what you’re doing.

uvicorn.Server.startup passes this to AsyncIO loop.create_server (in asyncio.base_events, where default is 100), and in turn to socket.socket.listen.

uvicorn uses a large default value (2048). That might be misguided.

See: https://bugs.python.org/issue38699, https://stackoverflow.com/a/2444491, https://stackoverflow.com/a/2444483,

workers

If 1, the server is launched in the current process/thread. If > 1, this many processes are spawned, each running one copy of app independently.

Note that worker processes are not dynamically created and killed according to need—this fixed number of processes are started and they remain active—this is in contrast to Gunicorn. This also makes the parameter worker_contexts meaningful.

Since mpservice.mpserver handles sophisticated multiprocessing servers “natively”, usually you should use workers=1, that is, let uvicorn run a simple “in-process” worker (implemented by a mpservice.mpserver.AsyncServer object) and let AsyncServer handle multiple worker processes.

There may be cases where workers > 1 has some speed advantage. One possible reason for such advantage might come from saving on pickling overhead. You need to bencharmk you particular use case to decide.

worker_contexts

If provided, this must be a list or tuple with as many as workers elements. The elements of worker_contexts will become the attribute worker_context of app in the corresponding worker process (or the current process if workers=1).

This is mainly designed for passing config-style data to worker processes when workers > 1. The app object is available to the “lifespan” of a Starlette object; user can obtain app.worker_context there and decide how to use it to initialize things, save it, as well as make it available to endpoint functions via the lifespan’s “state”. See tests for example usage.

server_id

If you run multiple services in the same program, you need to use server_id to distinguish them and provide the ID to both start_server and stop_server.

However, that use case seems extrememly unlikely.

**kwargs

Passed to uvicorn.Config.

async mpservice.http.stop_server(server_id: str = '0')[source]#

This function is to be called in a web service endpoint to request termination of the service.

Serving using named pipes#

The module mpservice.pipe provides tools to use a “named pipe” to communicate between two Python processes on the same machine.

Usually the two Python processes are two separately started programs. If they are two processes created by multiprocessing in a single program, then you would directly use multiprocessing.Pipe instead of this module.

To start, create a Server object in one process and a Client object in the other process, providing the same path argument. The two objects can be created in any order, and their roles are symmetric. The different names simply remind the user to create one of each in the two processes.

Two uni-directional pipes are created between the two processes, represented by the files named f"{path}.1" and f"{path}.2". In each process, you send() to one pipe and recv() from the other pipe. The send and recv functions take and return picklable Python objects. While send does not block as long as system buffer has space, recv blocks until one data item is read.

It’s up to the application to design handshaking values understood by both sides.

The roles of the two pipes in the two processes are flipped; this role assignment is take care of internally. To prevent glitches, make sure the two files are non-existent before server and client are created.

class mpservice.pipe._Pipe[source]#

Bases: object

See multiprocessing.connection.Connection for documentation on the methods send(), recv(), send_bytes(), recv_bytes(), recv_bytes_into().

__init__(rpath: str, wpath: str)[source]#
send_bytes(buf, offset=0, size=None)[source]#
send(obj)[source]#
recv_bytes(maxlength=None)[source]#
recv_bytes_into(buf, offset=0)[source]#
recv()[source]#
class mpservice.pipe.Server[source]#

Bases: _Pipe

__init__(path: str)[source]#
class mpservice.pipe.Client[source]#

Bases: _Pipe

__init__(path: str)[source]#

Serving using sockets#

The module mpservice.socket provides tools to use sockets to communicate between two Python processes on the same machine.

class mpservice.socket.SocketApplication[source]#

Bases: object

SocketApplication is designed to to used similar to the “application” in a HTTP framework. The main API is to register “endpoint” functions by the method add_route(). This allows to back the socket service by multiple functions for different purposes. Usually there is only one main function, which involves transmitting substantial amount of data between the server and the client. For simplicity, one may use '/' for the path of this route. The other routes are usually supportive, for example, getting server info or setting options. For example:

app.add_route('/', make_prediction)
app.add_route('/server-info', get_server_info)
app.add_route('/set-option', set_server_option)

This class is the intended interface between a socket server and a particular application (functions). Usually, user should not customize the class SocketServer.

__init__()[source]#
add_route(path: str, route: Callable[[Any], Awaitable[Any]] | Callable[[], Awaitable[Any]])[source]#

route is an async function that takes a single positional arg, and returns a response (which could be None if so desired). The response should be serializable by the encoder. To be safe, return a object of Python native types. If exception is raised in this method, appropriate RemoteException object will be sent in the response. The method could also proactively return a RemoteException object.

path is any string. The route is identified by this string. For familiarity, it may be a good idea to start the string with '/', although this is in no way necessary.

There is no GET/POST distinction like in the case of HTTP.

async handle_request(path: str, data: Any | None = None)[source]#

Dispatch the request to a registered route function.

class mpservice.socket.SocketServer[source]#

Bases: object

__init__(app: SocketApplication, *, path: str | None = None, host: str | None = None, port: int | None = None, backlog: int | None = None, shutdown_path: str = '/shutdown')[source]#

backlog is the max concurrent in-progress requests per connection. (Note, the client may open many connections.) This “concurrency” is in terms of concurrent calls to SocketApplication.handle_request().

The type of the service, between ‘tcp’ and ‘unix’, is determined by the parameters path, host, and port. See code for details.

async serve()[source]#

Start the server and let it stay up until shutdown conditions are met.

mpservice.socket.make_server(app: SocketApplication, **kwargs)[source]#

Example:

async def double(data):
    await asyncio.sleep(0.01)
    return data * 2

app = SocketApplication()
app.add_route('/', double)

server = make_server(app, path='/tmp/sock_abc')
asyncio.run(server.serve())
class mpservice.socket.SocketClient[source]#

Bases: object

__init__(*, path: str | None = None, host: str | None = None, port: int | None = None, num_connections: int | None = None, connection_timeout: int = 60, backlog: int = 2048)[source]#
Parameters:
path, host, port

Either path is given (for Unix socket), or port (plus optionally host) is given (for Tcp socket). These values should, of course, be consistent with the corresponding server.

num_connections

This is expected to have a direct impact on the performance, hence needs experimentation.

connection_timeout

How many seconds to wait while connecting to the server. This is meant for waiting for server to be ready, rather than for the action of “connecting” itself (which should be fast).

backlog

Size of the queue for in-progress requests.

__enter__()[source]#
__exit__(*args, **kwargs)[source]#
request(path: str, data=None, *, enqueue_timeout=None, response_timeout=None)[source]#

This could raise concurrent.futures.TimeoutError. That means result is not available in the specified time, but the request may have well been sent to the server. However the user handles the exception, it will not affect the server’s response to the request. The user will not be able to resume the wait for the result.

If caller does not need the response, use response_timeout=0.

In some cases, the request does not need to send data, e.g. if the request if for certain info query. In such situations, the corresponding function on the server side takes no argument, and in this call to request, data should be None.

Example:

request('/shutdown', response_timeout=0)
stream(path: str, data: Iterable, *, return_x: bool = False, return_exceptions: bool = False, enqueue_timeout=60, response_timeout=60)[source]#

If return_x is True, return a stream of (x, y) tuples, where x is the input data, and y is a dict with element ‘data’. If return_x is False, return a stream of y. If return_exceptions is True, y could be an Exception object.