diff --git a/s3/Cargo.toml b/s3/Cargo.toml index 5d0576c7ba..8c9fe5ac33 100644 --- a/s3/Cargo.toml +++ b/s3/Cargo.toml @@ -86,7 +86,7 @@ default = ["fail-on-err", "tags", "tokio-native-tls"] sync = ["attohttpc", "maybe-async/is_sync"] with-async-std-hyper = ["with-async-std", "surf/hyper-client"] with-async-std = ["async-std", "futures"] -with-tokio = ["futures", "reqwest", "tokio", "tokio/fs", "tokio-stream"] +with-tokio = ["futures/alloc", "reqwest", "tokio", "tokio/fs", "tokio-stream"] blocking = ["block_on_proc", "tokio/rt", "tokio/rt-multi-thread"] fail-on-err = [] diff --git a/s3/src/request/request_trait.rs b/s3/src/request/request_trait.rs index a6409b4f52..27383e5152 100644 --- a/s3/src/request/request_trait.rs +++ b/s3/src/request/request_trait.rs @@ -119,6 +119,29 @@ impl fmt::Display for ResponseData { } } +#[cfg(feature = "with-tokio")] +impl tokio::io::AsyncRead for ResponseDataStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + use futures::StreamExt; + let bytes = self.get_mut().bytes(); + match bytes.poll_next_unpin(cx) { + std::task::Poll::Ready(Some(Ok(chunk))) => { + buf.put_slice(&chunk); // Put the chunk into the buffer + std::task::Poll::Ready(Ok(())) + } + std::task::Poll::Ready(Some(Err(error))) => { + std::task::Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, error))) + } + std::task::Poll::Ready(None) => std::task::Poll::Ready(Ok(())), + std::task::Poll::Pending => std::task::Poll::Pending, + } + } +} + #[maybe_async::maybe_async] pub trait Request { type Response;