Missing piece: worker process pools, run_in_worker_process
It'd be nice to have a clean way to dispatch CPU-bound work. Possibly based on multiprocessing.
An interesting question is how this interacts with task-local storage (#2). run_in_worker_thread can easily preserve task-local context because it's in-process (and there's no contention, because the owning task is frozen for the duration of the run_in_worker_thread call). For worker processes, it's not so simple -- what if there's something in the context that can't be pickled? Should changes made in the process be propagated back?
One possibility is that when creating task-local objects, they could be tagged as multiprocess friendly or not, e.g. trio.TaskLocal(pickle=True).
should I work something on trio-multiprocessing?
@auvipy Well I mean... it's up to you :-). I think it would be great to have multiprocessing-like tools in Trio. There are lots of things that would be great though, so it really depends on whether it's something you want to work on :-).
Since I haven't looked at this issue in like, 2 years, let me update with some more recent thoughts:
I wouldn't try to use the multiprocessing library, because it's incredibly complex internally, and fragile and lots of ways. I would do something from scratch, and focus on a small set of features with lots of polish.
I think I'd probably do it as a third-party library, instead of baking it into trio itself, because it's actually a pretty complex and self-contained project.
My first goal would be a really seamless await run_in_process(async_fn, *args). I'd use cloudpickle instead of pickle, to avoid a lot of hassles that multiprocessing users run into. I'd try hard to make it friendly to use: make sure control-C works in a sensible way, make sure you can use print inside a subprocess to see what's happening, make sure that if the parent dies unexpectedly the children don't get left behind as orphans, that kind of thing.
Next goal would probably be some kind of process pool support, to cache processes between invocations, because process startup is very expensive, much more so than threads.
Then if I wanted to get really fancy, I'd think about ways to seamlessly pass channel objects between processes, so that you can set up producers and consumers in different processes and they can talk to each other.
I understand. thanks for explaining. celery need a better billiard :)
Ah, I see! Well, then I guess you understand why I am hesitant to suggest using multiprocessing itself as the base :-).
@njsmith I'd be curious to get your feedback on this proof-of-concept for a run_in_process function.
https://github.com/ethereum/trinity/pull/1079/files
Specifically, I'm interested in what you think about how I leveraged the trio subprocess APIs to achieve the process isolation.
This is probably something that I'll keep internal to our codebase for a bit while I polish it, but if this approach is something you think is acceptable for larger scale use then I'd like to get it packaged up for others to use as well.
@pipermerriam I guess there are two important decisions that affect how you implement a run_in_worker_process-style API:
- Do you want to spawn a new process each time, or re-use processes? The former is simpler and avoids issues with state leaking between invocations; the latter is faster because you can amortize the process startup cost.
- Do you want to use the child's stdio to communicate with the parent, or do you want to use some other mechanism? The former is simpler to implement; the later lets you use stuff like
printandpdbin the child.
It looks like in your prototype, you're going for "new process each time" and "use stdio". In that case, you can simplify the code a lot, to something like:
async def run_in_worker_process(...):
encoded_job = cloudpickle.dumps((async_fn, args, ...))
p = await trio.run_process([sys.executable, "-m", __name__], stdin=encoded_job)
return cloudpickle.loads(p.stdout)
if __name__ == "__main__":
job = cloudpickle.load(sys.stdin.detach())
retval = trio.run(...)
cloudpickle.dump(sys.stdout.detach(), retval)
If you want to re-use the processes but are happy with using stdio, then you need to add something like the framing protocol that you use in your prototype. But you don't need to create any pipes manually or anything – you can use trio.open_process(..., stdin=subprocess.PIPE, stdout=subprocess.PIPE), and then process.stdio is a regular trio Stream that you can use to send and receive data. This way Trio takes care of setting up the pipes, handling unix vs windows differences, choosing which low-level system API to use when working with those pipes, etc. You might also want #1104 to make the process lifetime management easier. Ideally you also want #174 so you can use Trio in the child when talking to stdin/stdout, though the way your prototype uses sync operations to talk to stdio and then calls trio.run in between would also work.
If you don't want to use stdio... well, I guess if you only care about one-shot processes then the simplest solution is to pass data through temp files :-). But let's say you choose the fanciest option, where you need persistent processes that you can send and receive messages to, and we use some other channel for communication. In this case you need to do something like:
-
create your pipes, using the appropriate method on Windows and Unix. (On Windows we don't have any public API for this yet... that's #824. On Unix you can use
os.pipe+trio.hazmat.FdStream. Note: in your prototype you usetrio.open_file– don't do that! That's for disk files, and it uses an inefficient thread-based API underneath, because that's the only thing that works for disk files. For pipes there are much better APIs, andFdStreamwill use them.) -
Pass the pipes into the child on some non-standard descriptor. On Windows this involves
subprocess.STARTUPINFO.lpAttributeList. On Unix you use thepass_fdsargument torun_process/open_process. Also tell the child where to find the pipes/handles, using e.g. command line arguments. -
In the child, convert those back into some kind of usable file objects. If the child is sync, then on Windows this involves
msvcrt.open_osfhandleto convert from a windows handle into an fd, and then on all platforms, once you have an fd you can useio.opento convert it into a python file object. If the child is async, then again you need #824 andFdStream.
@njsmith great feedback and lots of good pointers.
I spent another few hours tinkering and have things in a state that I'm starting to get more happy with. I'm not necessarily convinced that any of my current design decisions are exactly right but behavior-wise it's quickly approaching the general purpose API that I'm shooting for.
- It no longer uses
stdin/stdoutfor child process communication. Child processes are passed file descriptors (thanks for the tip aboutpass_fds) and use those for communication between parent and child. - Things like
SIGINTandSIGTERMshould just work (they are relayed to child processes when they occur in the parent process). -
SIGKILLshould also just work causing immediate termination.
The code doesn't currently re-use child processes but have had that functionality in mind and I'm reasonably confident that it can be added without too much effort. For my specific use case this isn't a high value feature but I know there are plenty of use cases where elimination of this overhead allows offloading CPU bound work so it's definitely on the roadmap.
Again, curious to hear any new thoughts you might have but I don't want to take up too much of your time.
Update: The previously linked PR is still a fine way to view the code but I've moved it to https://github.com/ethereum/trio-run-in-process which has the code in a more organized fashion.
Celery could be greatly benefitted by this