-
Notifications
You must be signed in to change notification settings - Fork 659
Open
Description
Currently, the Buffered
on queue new items on the beginning of the polling, which make the stream.buffered(2)
not actually parallelizing.
futures-rs/futures-util/src/stream/stream/buffer_unordered.rs
Lines 56 to 87 in cc3a15d
impl<St> Stream for BufferUnordered<St> | |
where | |
St: Stream, | |
St::Item: Future, | |
{ | |
type Item = <St::Item as Future>::Output; | |
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
let mut this = self.project(); | |
// First up, try to spawn off as many futures as possible by filling up | |
// our queue of futures. | |
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) { | |
match this.stream.as_mut().poll_next(cx) { | |
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), | |
Poll::Ready(None) | Poll::Pending => break, | |
} | |
} | |
// Attempt to pull the next value from the in_progress_queue | |
match this.in_progress_queue.poll_next_unpin(cx) { | |
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, | |
Poll::Ready(None) => {} | |
} | |
// If more values are still coming from the stream, we're not done yet | |
if this.stream.is_done() { | |
Poll::Ready(None) | |
} else { | |
Poll::Pending | |
} | |
} |
Metadata
Metadata
Assignees
Labels
No labels