Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent order of MessageChannel messages and task (race conditions) #133

Open
Prinzhorn opened this issue May 17, 2021 · 9 comments
Open
Labels
enhancement New feature or request

Comments

@Prinzhorn
Copy link
Contributor

Prinzhorn commented May 17, 2021

Piscina 3.0.0 (2.2.0 as well), Node.js 14.17.0 and 16.1.0

In the official example of using MessageChannel the order is not consistent. The task can finish before the message event or the other way round. I was naively expecting every postMessage inside the worker to appear before as a message event before runTask finishes. But the order is seemingly random causing funny side effects on my end.

Here's the code adapted from the official example. I haven't seen it ever happen on the first iteration, but often the second (sometimes a hand full of iterations) will stop because the task resolved before the message event happened.

Expected order: [message, done]
What I see often: [done, message]

index.js

'use strict';

const Piscina = require('piscina');
const { resolve } = require('path');
const { MessageChannel } = require('worker_threads');

const piscina = new Piscina({
  filename: resolve(__dirname, 'worker.js'),
});

(async function () {
  for (let i = 0; i < 1000; i++) {
    const channel = new MessageChannel();
    let events = [];

    channel.port2.on('message', (message) => {
      console.log('message');
      events.push('message');
    });

    await piscina.runTask({ port: channel.port1 }, [channel.port1]);
    console.log('done');
    events.push('done');

    // Make sure we wait for all message events to make it more reliable.
    await new Promise((resolve) => {
      setTimeout(resolve, 100);
    });

    if (events[0] !== 'message' || events[1] !== 'done') {
      console.log(events);
      console.error(new Error('Wrong order'));
      process.exit(1);
    }

    console.log('-----------------------------');
  }

  process.exit(0);
})();

worker.js

'use strict';

module.exports = ({ port }) => {
  port.postMessage('hello from the worker pool');
};

Now it gets more interesting with multiple message, let's send two messages from the worker:

Expected order: [message, message, done]
What I see often: [done, message, message] or even [message, done, message]

index.js

'use strict';

const Piscina = require('piscina');
const { resolve } = require('path');
const { MessageChannel } = require('worker_threads');

const piscina = new Piscina({
  filename: resolve(__dirname, 'worker.js'),
});

(async function () {
  for (let i = 0; i < 1000; i++) {
    const channel = new MessageChannel();
    let events = [];

    channel.port2.on('message', (message) => {
      console.log('message');
      events.push('message');
    });

    await piscina.runTask({ port: channel.port1 }, [channel.port1]);
    console.log('done');
    events.push('done');

    // Make sure we wait for all message events to make it more reliable.
    await new Promise((resolve) => {
      setTimeout(resolve, 100);
    });

    if (events[0] !== 'message' || events[1] !== 'message' || events[2] !== 'done') {
      console.log(events);
      console.error(new Error('Wrong order'));
      process.exit(1);
    }

    console.log('-----------------------------');
  }

  process.exit(0);
})();

worker.js

'use strict';

module.exports = ({ port }) => {
  port.postMessage('hello from the worker pool');
  port.postMessage('hello from the worker pool');
};

With such a small message you usually get [done, message, message] with the second iteration. With a larger message you can get [message, done, message] as well and it often takes more than two iterations.

'use strict';

module.exports = ({ port }) => {
  port.postMessage(Buffer.alloc(1024 * 1024 * 5));
  port.postMessage(Buffer.alloc(1024 * 1024 * 5));
};

E.g. here's a run that just took more than two iterations:

 $ node index.js 
message
message
done
-----------------------------
message
message
done
-----------------------------
message
message
done
-----------------------------
message
message
done
-----------------------------
message
done
message
[ 'message', 'done', 'message' ]
Error: Wrong order
    at /home/alex/src/issues/piscina-race-condition/index.js:32:21

If this is expected behavior on Node.js's end I wonder if Piscina could do something about that? I'm not really into implementing flow control with ACK messages and what not when that's exactly why I'm using Piscina in the first place (native worker threads are somewhat painful to use 😅 ).

@Prinzhorn Prinzhorn changed the title Consistent order of MessageChannel messages and task (race conditions) Inconsistent order of MessageChannel messages and task (race conditions) May 17, 2021
@jasnell
Copy link
Collaborator

jasnell commented May 17, 2021

The timing on here is... difficult. The communication between the worker and the parent is based on MessageChannel and MessagePort... which generally happens synchronously but relies on signaling via the event loop to determine when the actual message notification happens. Signaling the completion of the work is also initially done by message passing but there the notification of "done" occurs via the microtask queue, which is being drained independently of the event loop. There are quite a few factors involved and the timing is exceedingly difficult to reason about. @addaleax may have some ideas here.

@Prinzhorn
Copy link
Contributor Author

Prinzhorn commented May 25, 2021

This is how it looks in Node.js without piscina. I was thinking about using a second MessageChannel for signaling but it looks like there are no guarantees regarding the order. This was unexpected but also kind of makes sense.

index.js

const path = require('path');
const { Worker, MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();

const worker = new Worker(path.join(__dirname, 'worker.js'));

worker.on('message', (message) => {
  console.log(message);
});

port1.on('message', (message) => {
  console.log(message);
});

worker.on('error', (err) => {
  console.error(err);
});

worker.on('exit', (code) => {
  process.exit(code);
});

worker.postMessage(port2, [port2]);

worker.js

const { parentPort } = require('worker_threads');

parentPort.once('message', (port) => {
  port.postMessage('A1');
  parentPort.postMessage('B1');
  port.postMessage('A2');
  parentPort.postMessage('B2');
});

Expected (naively):

A1
B1
A2
B2

Actual (but depends on size of messages):

A1
A2
B1
B2

I assume this is because in the event loop iteration each message channel has a queue and the message are processed without considering the order of other channels. I can accept that. Interestingly enough the parentPort messages are last. So I wonder how piscina manages to race in front of my messages? Probably because of size?

So here's a possible solution for piscina: allow user-land messages to be passed over the same port that piscina is using. But transparently of course. Assuming that the order of postMessage on the same port is guaranteed.

In the worker

// A special `parentPort` that allows sending messages through the same port that piscina uses internally.
const { parentPort } = require('piscina');

Now the question is how this could look on the other end. Could the promise returned from run be also an EventEmitter that can emit message?

@metcoder95
Copy link
Member

So here's a possible solution for piscina: allow user-land messages to be passed over the same port that piscina is using. But transparently of course. Assuming that the order of postMessage on the same port is guaranteed.

Though this can provide a sense of order, the truth is that it won't guarantee to be deterministic.

Now the question is how this could look on the other end. Could the promise returned from run be also an EventEmitter that can emit message?

That will complicate implementation and make things more entangled, separated API will be better.

Though, just to understand your issue. Here you're trying to communicate 1:1 with the workers by not directly providing a task, but rather broadcasting a message?
What's the use case you're trying to fulfill? 🤔

@Prinzhorn
Copy link
Contributor Author

Prinzhorn commented Jun 15, 2023

Though, just to understand your issue. Here you're trying to communicate 1:1 with the workers by not directly providing a task, but rather broadcasting a message?
What's the use case you're trying to fulfill? thinking

I'm streaming results from the task to the client via Websockets. While the task is running I keep sending batches of data events. Once the task is done (the Promise resolves) I send a done event to the client. Due to the race condition the client sometimes gets data events after the done event. What I have done as a workaround is to completely ignore the piscina.run() Promise for control flow and instead send a done message from within the worker. Only rejection of the promise is handled and creates an error event in the client.

@metcoder95
Copy link
Member

metcoder95 commented Jun 16, 2023

Yeah, have a sense.
I think bidirectional communication might be something worth it to explore, though it would be something to be better done off the current pillars of piscina as it is designed to be used as an orchestrator to distribute tasks and handle its result.

Do you have a suggestion or idea of how can the API look like?

@Prinzhorn
Copy link
Contributor Author

I want to point out that what I'm doing is purely unidirectional. The worker is sending data (or task progress) to the main thread. There is no communication in the other direction. The official "progress" example (https://github.com/piscinajs/piscina/blob/current/examples/progress/index.js) likely suffers from the same issue. It can call bar.tick() after await piscina.run() has resolved.

Do you have a suggestion or idea of how can the API look like?

I'm sick rn and can't put any more thoughts into that. The issue could be solved without any API changed if there was a way to only make the task resolve after all MessagePort that were passed to transferList have an empty queue. I don't think Node.js offers any low level control like that?

@metcoder95
Copy link
Member

Using the worker_threads API should make it possible by using some signaling that indicate either the order or how to process each message, but with Piscina might be tricky. Although might be worth it to explore, I'm just trying to draw how it might look like 🤔

Copy link

github-actions bot commented Jun 2, 2024

This issue has been marked as stale because it has been opened 30 days without activity. Remove stale label or comment or this will be closed in 5 days.

@github-actions github-actions bot added the stale label Jun 2, 2024
@Prinzhorn
Copy link
Contributor Author

Not stale and still a valid feature request

@metcoder95 metcoder95 added the enhancement New feature or request label Jun 6, 2024
@github-actions github-actions bot removed the stale label Jun 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants