Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Readable.flatMap concurrency isn't working as expected #52796

Open
MoLow opened this issue May 2, 2024 · 3 comments · May be fixed by #52816
Open

Readable.flatMap concurrency isn't working as expected #52796

MoLow opened this issue May 2, 2024 · 3 comments · May be fixed by #52816
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@MoLow
Copy link
Member

MoLow commented May 2, 2024

Version

v22.0.0

Platform

Darwin Moshes-MBP.localdomain 23.3.0 Darwin Kernel Version 23.3.0: Wed Dec 20 21:30:44 PST 2023; root:xnu-10002.81.5~7/RELEASE_ARM64_T6000 arm64

Subsystem

stream

What steps will reproduce the bug?

Run this code:

const { Readable } = require('node:stream');
const { setTimeout } = require('node:timers/promises');

const start = Date.now();
const base = Readable.from([1, 2, 3, 4, 5], { highWaterMark: 100 });
const mapped = base.flatMap(async function * (x) {
  await setTimeout(1000);
  yield x * 2;
}, { concurrency: 200 });

(async () => {
  for await (const chunk of mapped) {
    console.log({ chunk }, Math.floor((Date.now() - start) / 1000));
  }
})();

How often does it reproduce? Is there a required condition?

allways

What is the expected behavior? Why is that the expected behavior?

since concurrency is set to 200 - I expect all the chunks to be emitted together, however there is a second between each chuck:

{ chunk: 2 } 1
{ chunk: 4 } 2
{ chunk: 6 } 3
{ chunk: 8 } 4
{ chunk: 10 } 5

What do you see instead?

{ chunk: 2 } 1
{ chunk: 4 } 2
{ chunk: 6 } 3
{ chunk: 8 } 4
{ chunk: 10 } 5

Additional information

when running similar code using Readable.map the concurrency works as expected:

const { Readable } = require('node:stream');
const { setTimeout } = require('node:timers/promises');

const start = Date.now();
const base = Readable.from([1, 2, 3, 4, 5], { highWaterMark: 100 });
const mapped = base.map(async function(x) {
  await setTimeout(1000);
  return x * 2;
}, { concurrency: 200 });

(async () => {
  for await (const chunk of mapped) {
    console.log({ chunk }, Math.floor((Date.now() - start) / 1000));
  }
})();

results in

{ chunk: 2 } 1
{ chunk: 4 } 1
{ chunk: 6 } 1
{ chunk: 8 } 1
{ chunk: 10 } 1
@MoLow MoLow added the stream Issues and PRs related to the stream subsystem. label May 2, 2024
@MoLow
Copy link
Member Author

MoLow commented May 2, 2024

I think the issue is this implementation:

function flatMap(fn, options) {
const values = map.call(this, fn, options);
return async function* flatMap() {
for await (const val of values) {
yield* val;
}
}.call(this);
}

it relays on map internally to perform queuing, but items arent yet generated at the time queuing is performed (i.e what is queued is the async generators, not their generated values)

@MoLow
Copy link
Member Author

MoLow commented May 2, 2024

CC @nodejs/streams

@mcollina
Copy link
Member

mcollina commented May 2, 2024

very odd one indeed!

@MoLow MoLow linked a pull request May 3, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants