python – Decorator to instantiate the class in another process

Motivation

I want to execute some heavy computing tasks in a separate process, so that they do not monopolize the GIL and can make effective use of a multi-core machine.

Where those tasks are pure functions, I would only use the one provided multiprocessing.Pool. However, that does not work so well for the tasks that maintain the state. I will assume an example of a process that is doing data encryption on the fly and pumping it to a file. I would like the keys, the block chaining parameters and the open file identifier (which cannot be pickled and passed between processes) to reside as internal status of some EncryptedWriter object. I wish I could use the public interface of that object in a completely transparent way. But I would like that object to reside in the external process.


Overview

To that end, this code creates a decorator @process_wrap_object It involves a class. The new class will generate an external process to instantiate an object from the wrapped class. The external process then calls the methods in it in the required order and returns the associated return values. The coordination object that lives in the original process is responsible for resending these functions.

The function process_wrap_object It is the decorator itself, which takes a class and returns a class.

The function _process_wrap_event_loop it is the one that executes the work process, which is closely coupled to the process_wrap_object.

Finally the function _process_disconnection_detector just check if the process_wrap_object The coordination object has been destroyed, either by normal garbage collection or because the main process was blocked. In any case, you should instruct the work process to close cleanly.


Warnings

Note that method calls are blocking, as are normal method calls. This means that, by itself, this container will not accelerate anything: it simply does the work elsewhere with more overhead. However, it effectively cooperates with the main process that is divided with a lighter thread within the process.


Code

import inspect
from functools import partial
from multiprocessing import Process, Queue, Pipe
from threading import Thread

CLOSE_CODE = "_close"

def _process_disconnection_detector(pipe, instruction_queue):
    """Watcher thread function that triggers the process to close if its partner dies"""
    try:
        pipe.recv()
    except EOFError:
        instruction_queue.put((CLOSE_CODE, (), {}))


def _process_wrap_event_loop(new_cls, instruction_queue, output_queue, pipe, *args, **kwargs):
    cls = new_cls.__wrapped__
    obj = cls(*args, **kwargs)

    routines = inspect.getmembers(obj, inspect.isroutine)
    # Inform the partner class what instructions are valid
    output_queue.put((r(0) for r in routines if not r(0).startswith("_")))
    # and record them for the event loop
    routine_lookup = dict(routines)

    disconnect_monitor = Thread(target=_process_disconnection_detector, args=(pipe, instruction_queue))
    disconnect_monitor.start()

    while True:
        instruction, inst_args, inst_kwargs = instruction_queue.get()
        if instruction == CLOSE_CODE:
            break
        inst_op = routine_lookup(instruction)
        res = inst_op(*inst_args, **inst_kwargs)
        output_queue.put(res)

    disconnect_monitor.join()

def process_wrap_object(cls):
    """
    Class decorator which exposes the same public method interface as the original class,
    but the object itself resides and runs on a separate process.
    """
    class NewCls:
        def __init__(self, *args, **kwargs):
            self._instruction_queue = Queue() # Queue format is ({method_name}, {args}, {kwargs})
            self._output_queue = Queue() # Totally generic queue, will carry the return type of the method
            self._pipe1, pipe2 = Pipe() # Just a connection to indicate to the worker process when it can close
            self._process = Process(
                target=_process_wrap_event_loop,
                args=((NewCls, self._instruction_queue, self._output_queue, pipe2) + list(args)),
                kwargs=kwargs
            )
            self._process.start()

            routine_names = self._output_queue.get()

            assert CLOSE_CODE not in routine_names, "Cannot wrap class with reserved method name."

            for r in routine_names:
                self.__setattr__(
                    r,
                    partial(self.trigger_routine, routine_name=r)
                )

        def trigger_routine(self, routine_name, *trigger_args, **trigger_kwargs):
            self._instruction_queue.put((routine_name, trigger_args, trigger_kwargs))
            return self._output_queue.get()

        def __del__(self):
            # When the holding object gets destroyed,
            # tell the process to shut down.
            self._pipe1.close()
            self._process.join()

    for wa in ('__module__', '__name__', '__qualname__', '__doc__'):
        setattr(NewCls, wa, getattr(cls, wa))
    setattr(NewCls, "__wrapped__", cls)

    return NewCls

Sample use:

@process_wrap_object
class EncryptedWriter:
    def __init__(self, filename, key):
        """Details unimportant, perhaps self._file = File(filename)"""
    def write_data(self, data):
        "Details still unimportant, perhaps self._file.write(encrypt(data))"

writer = EncryptedWriter(r"C:UsersJosiahDesktopnotes.priv", 4610)
writer.write_data("This message is top secret and needs some very slow encryption to secure.")

I am looking for a general review of both the high-level approach and the particular implementation, with special interest in any subtlety around multiprocessing or the correct decorator wrapping for classes.

Any suggestions on the additional functionality that would make this decorator noticeably more useful are also welcome. One feature that I am considering, but not yet implemented, is the explicit support for __enter__ Y __exit__ work with with blocks