worker: add connect and setConnectionsListener
This PR adds two new API to worker_threads that allow for cross-thread communication via MessagePorts.
A thread can invoke worker.connect to start a new connection to another thread. This method is blocking. Upon success, the return value is a MessagePort which can be used to exchange messages.
The core idea is that a thread willing to accept connections from other thread uses worker.setConnectionsListener to install a callback that it is invoked with a thread id, a port and (optional) data each time another thread attempts a connection.
The listener can return true to accept the connection. Any other return value will result in the connection being refused.
By default, if a thread has no listener associated, the connection will be refused.
Notable Change Text
A new set of experimental APIs has been added to worker_threads: connect and setConnectionsListener. These APIs aim to simplify 1-1 inter-thread communication in Node.js
Every thread (including the main one) can start a connection to any another thread (including the main one) using the connect API and providing the target threadId.
If the connection is successful, the call will return a MessagePort that can be used for the communication.
A thread can opt-in to receive incoming connections by calling the setConnectionsListener API. The listener will be invoked for each connection attempt and must return true to accept the connection. A thread without a connection listener will refused any connection by default.
CC: @dygabo
I think this all looks fine.
- Do we want to give this an experimental status? Maybe active development?
Good point. I think I should.
- Should the docs mention that this is blocking?
Yes, I forgot about it.
- Does this have any effect on current worker threads performance?
I don't think so. The communication to accept new connections uses the same port which is already in place for communication with the parent thread. I just added a new message. After that it only creates a new MessageChannel. Memory wise, the only addition is basically a global std::map and while the memory usage obviously increased a little bit (but remember we're referencing pointers) I don't think performance are impacted.
CI: https://ci.nodejs.org/job/node-test-pull-request/59840/
@GeoffreyBooth Modifications done.
it seems inherently race conditiony
@benjamingr why?
This drives workers further away from web workers and I'm not sure what it solves since:
I can see it, but I think it's necessary to allow better intra-process communication.
- The workers can't know the worker they're connecting to already set up its
setConnectionsListener
They can, indirectly. We might need extended documentation for this, but the general idea is that workers sync up (and share their threadId via BroadcastChannel. When both workers are ready, one can start a connection to the other one.
- If they do - they inherently have the knowledge of whomever created the workers (e.g. isMainThread) in that case they can already make a MessagePort and postMessage it to both workers
Not really. It's really hard to properly propagate ports, especially if you think about several level of workers. Nowadays if you, for instance, have a 10-level workers hierarchy and you want the level 2 to speak to a worker on level 9 you have to forward a port down 7 levels somehow. Also, once the port is transferred, the original forwarding thread will have no connection with the target thread.
With this architecture instead a worker on level 9 can advertise its threadId via BroadcastChannel and the interested parties will be able to directly connect without forwarding anything.
I won't block but I'm not a fan of this API (because it seems inherently race conditiony)
See my next comment.
After @benjamingr and @dygabo commented I double checked and there was a possible race condition when a worker was blocked during a connect and a connection request arrived on itself.
The connect with my last push becomes async, while the connection listener can now return a Promise. This solves the race condition.
CI: https://ci.nodejs.org/job/node-test-pull-request/59863/
Thanks, the main race condition I noticed was addressed. Now I'm trying to come up with the use case. I need to know the threadId of the worker I'm connecting to - how do I get it? Can't whomever gave it to me give me a created MessagePort instead?
Also:
The target thread must have a connection listener setup via [
worker.setConnectionsListener()][] otherwise the connection request will fail. When no listeners are present (the default) all connection requests are immediately refused.
This sounds incorrect? It should wait for a connection listener to exist so there isn't a race between the worker setting up a connection listener and the other thread calling worker.connect.
As for the other issue (adding more APIs to worker). I would strongly prefer it if these were static methods (that would ideally also work or can be made to work with web workers). That would address our "standards like" thing (Worker) not deviating from the spec further while it still being possible to add the API. i.e. worker_threads.connect(worker, target, date) and worker_threads.setConnectionListener(worker, fn). This is what we've done in events (with EventTarget) for example and how we've extended standard APIs in the past.
Nowadays if you, for instance, have a 10-level workers hierarchy and you want the level 2 to speak to a worker on level 9 you have to forward a port down 7 levels somehow.
I think I'm also missing something technical here - why is it easier to broadcast its threadId and hard to broadcast a MessagePort?
(I don't understand your use case well enough to block or approve yet so I'm avoiding either)
(I think this sort of experimentation is very valuable on its on, and I encourage you to keep doing it regardless of whether or not this lands)
I tend to agree with @benjamingr -- it's not ideal to introduce further differences to other thread implementations.
I agree, but I think on the browser context they never dealt with such possible complex case since you typically don't do this in a client.
Nowadays if you, for instance, have a 10-level workers hierarchy and you want the level 2 to speak to a worker on level 9 you have to forward a port down 7 levels somehow.
Is this a common use case? I am genuinely curious because it reminds me of anti-patterns in traditional OOP design.
10 is purposely exaggerated. But even if you think about 4 levels is enough. I agree on anti-patterns, but we all know they are only followed in theory, isn't it? :)
I think I'm also missing something technical here - why is it easier to broadcast its threadId and hard to broadcast a MessagePort?
Because BroadcastChannel does not support TransferList.
Thanks, the main race condition I noticed was addressed. Now I'm trying to come up with the use case. I need to know the threadId of the worker I'm connecting to - how do I get it? Can't whomever gave it to me give me a created MessagePort instead?
Also:
The target thread must have a connection listener setup via [
worker.setConnectionsListener()][] otherwise the connection request will fail. When no listeners are present (the default) all connection requests are immediately refused.This sounds incorrect? It should wait for a connection listener to exist so there isn't a race between the worker setting up a connection listener and the other thread calling
worker.connect.
I'll address all the questions in this comment, in hope it explains better why I developed this API and why I did it this eay. While working on #53200, @dygabo discovered an edge case with hooks thread. The explanation is here but I'll generalize in this thread as well as it applies in similar scenarios.
The core idea behind hooks thread (HT for short) is that each other thread in the application can talk to it. In order to achieve this, everytime a new worker is created, the parent thread is responsible to create a message channel. One end of the port is sent to the children thread, the other one to the HT. Given only one HT can exist, if such thread is created in a section of the thread tree (think about the left side of binary tree) there is a chance another section never got the link to HT simply because HT didn't exist when the parent thread was created. Currently, there was no easy way to establish the missing link.
If you generalize this problem, you can easily see MessagePort forwarding doesn't work very well if the thread tree is beyond few levels of depth. A parent thread must either transfer the port (and thus losing it locally) or create complex routing logic that don't scale really well.
Thus, I explored other solutions. The BroadcastChannel worked really well for sharing info at any level, but it didn't support transferList. So I implemented the connect functionality. If you join the functionality, we achieve the full thread communication, at any level.
For instance, let me show how we can apply to the hooks thread problem.
- Node starts, no HT present
- A lot of workers, several levels deep, are created. All these threads listen on a specific
BroadcastChannel(which, I emphasize, is like a topic and it is identified by a string). - One of the workers (in which level doesn't matter) creates the HT thread.
- Upon completing its boot, the HT publishes on the same
BroadcastChannelitsthreadId - All other workers can store the
threadIdlocally (used for the next step) and obtain a connection to HT. - When a new worker is starting, the parent thread can provide it the HT
threadIdso that the new worker can establish a connection.
This pattern can be applied to other similar cases, obviously. Let me know explanation is enough or you have other questions.
For instance, let me show how we can apply to the hooks thread problem.
@ShogunPanda I know I am no core collaborator and have no weight in the decision making, my opinions are low weight. But I really appreciate the efforts and the energy you put into this. fwiw I believe, as already mentioned, that this goes in the right direction if we want to solve it with the single thread (which I am not so sure we still want). I still do not believe it covers the race case correctly because of timing and syncness requirements. Mainly I am thinking point 4. from your list does not happen in time correctly for TN.2 from this example. Especially considering that TN.2 is with its execution here which needs to be sync because this port is needed exactly for syncifing async operation via worker thread.
Or am I missing something?
@dygabo First of all, don't think of you so low. Your efforts in loaders are as important as mine.
About the problem above, we can solve it by delegating the creation of hooks thread to the main thread (which, with this PR, any thread can speak to from any level). Since the main thread will process the events in order it will recognized the second request from either of the racing thread and eventually queue it up to respond with the same object to both.
Using Atomics and similar we can mimic sync calls in async context.
that would go in the direction of the idea that I tried to validate explained here
There would be a channel at every moment between every thread and the main thread (hooksPortServiceProvider - name up to debate). Each thread can use that to create a message channel and send one port via the main thread to the hooks thread. If the hooks thread does not exit, main thread will create it. I think that has better chances but not sure it covers all cases.
Still my previous thoughts on correctness in case of error on the hooks thread and what happens if that thread dies. I would think it would either be restarted (but all the connected threads are not connected to the new one anymore) or it pushes the exit event over to all the others, killing them for a reason not obvious to them.
Because BroadcastChannel does not support TransferList.
Ah that makes sense, I understand the use case thanks for explaining. I would prefer a utility method on the worker_threads module rather than an instance method if possible for the compatibility concerns.
I think I might have written something wrong in the docs, but this is already on the module level, not on the instance
Ah I see (and also the tests), then that SGTM. I think this would also benefit a lot from an example in the docs.
Is this ready for review?
Is this ready for review?
From my perspective yes.
CI: https://ci.nodejs.org/job/node-test-pull-request/59925/
CI: https://ci.nodejs.org/job/node-test-pull-request/59926/
CI: https://ci.nodejs.org/job/node-test-pull-request/59927/
The https://github.com/nodejs/node/labels/notable-change label has been added by @benjamingr.
Please suggest a text for the release notes if you'd like to include a more detailed summary, then proceed to update the PR description with the text or a link to the notable change suggested text comment. Otherwise, the commit will be placed in the Other Notable Changes section.
CI: https://ci.nodejs.org/job/node-test-pull-request/59939/
CI: https://ci.nodejs.org/job/node-test-pull-request/59946/
FWIW I'm not sure if these new APIs will allow to solve the problem of #53200. But well, they might be helpful for other cases :o).
After a careful analysis, I came to a solution which is way simpler, it doesn't mess with C++ and it allows the same communication.
See https://github.com/nodejs/node/pull/53682. Closing this.