Skip to content

Commit

Permalink
feat: separate part-size for PUT & GET
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Tan <[email protected]>
  • Loading branch information
crrow committed Jul 27, 2024
1 parent ac6c177 commit 7d809bd
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 32 deletions.
8 changes: 6 additions & 2 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ where
type PutObjectRequest = FailurePutObjectRequest<Client, GetWrapperState>;
type ClientError = Client::ClientError;

fn part_size(&self) -> Option<usize> {
self.client.part_size()
fn read_part_size(&self) -> Option<usize> {
self.client.read_part_size()
}

fn write_part_size(&self) -> Option<usize> {
self.client.write_part_size()
}

async fn delete_object(
Expand Down
6 changes: 5 additions & 1 deletion mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,11 @@ impl ObjectClient for MockClient {
type PutObjectRequest = MockPutObjectRequest;
type ClientError = MockClientError;

fn part_size(&self) -> Option<usize> {
fn read_part_size(&self) -> Option<usize> {
Some(self.config.part_size)
}

fn write_part_size(&self) -> Option<usize> {
Some(self.config.part_size)
}

Expand Down
8 changes: 6 additions & 2 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ impl ObjectClient for ThroughputMockClient {
type PutObjectRequest = MockPutObjectRequest;
type ClientError = MockClientError;

fn part_size(&self) -> Option<usize> {
self.inner.part_size()
fn read_part_size(&self) -> Option<usize> {
self.inner.read_part_size()
}

fn write_part_size(&self) -> Option<usize> {
self.inner.write_part_size()
}

async fn delete_object(
Expand Down
8 changes: 6 additions & 2 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,13 @@ pub trait ObjectClient {
type PutObjectRequest: PutObjectRequest<ClientError = Self::ClientError>;
type ClientError: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static;

/// Query the part size this client uses for PUT and GET operations to the object store. This
/// Query the part size this client uses for GET operations to the object store. This
/// can be `None` if the client does not do multi-part operations.
fn part_size(&self) -> Option<usize>;
fn read_part_size(&self) -> Option<usize>;

/// Query the part size this client uses for PUT operations to the object store. This
/// can be `None` if the client does not do multi-part operations.
fn write_part_size(&self) -> Option<usize>;

/// Delete a single object from the object store.
///
Expand Down
58 changes: 41 additions & 17 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ macro_rules! event {
pub struct S3ClientConfig {
auth_config: S3ClientAuthConfig,
throughput_target_gbps: f64,
part_size: usize,
read_part_size: usize,
write_part_size: usize,
endpoint_config: EndpointConfig,
user_agent: Option<UserAgent>,
request_payer: Option<String>,
Expand All @@ -101,7 +102,8 @@ impl Default for S3ClientConfig {
Self {
auth_config: Default::default(),
throughput_target_gbps: 10.0,
part_size: DEFAULT_PART_SIZE,
read_part_size: DEFAULT_PART_SIZE,
write_part_size: DEFAULT_PART_SIZE,
endpoint_config: EndpointConfig::new("us-east-1"),
user_agent: None,
request_payer: None,
Expand All @@ -128,7 +130,22 @@ impl S3ClientConfig {
/// Set the part size for multi-part operations to S3 (both PUT and GET)
#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn part_size(mut self, part_size: usize) -> Self {
self.part_size = part_size;
self.read_part_size = part_size;
self.write_part_size = part_size;
self
}

/// Set the part size for multi-part-get operations to S3.
#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn read_part_size(mut self, size: usize) -> Self {
self.read_part_size = size;
self
}

/// Set the part size for multi-part-put operations to S3.
#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn write_part_size(mut self, size: usize) -> Self {
self.write_part_size = size;
self
}

Expand Down Expand Up @@ -248,7 +265,8 @@ struct S3CrtClientInner {
/// Here it will add the user agent prefix and s3 client information.
user_agent_header: String,
request_payer: Option<String>,
part_size: usize,
read_part_size: usize,
write_part_size: usize,
bucket_owner: Option<String>,
credentials_provider: Option<CredentialsProvider>,
host_resolver: HostResolver,
Expand Down Expand Up @@ -352,13 +370,14 @@ impl S3CrtClientInner {

// max_part_size is 5GB or less depending on the platform (4GB on 32-bit)
let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
if !(5 * 1024 * 1024..=max_part_size).contains(&config.part_size) {
return Err(NewClientError::InvalidConfiguration(format!(
"part size must be at between 5MiB and {}GiB",
max_part_size / 1024 / 1024 / 1024
)));
for part_size in [config.read_part_size, config.write_part_size] {
if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
return Err(NewClientError::InvalidConfiguration(format!(
"part size must be at between 5MiB and {}GiB",
max_part_size / 1024 / 1024 / 1024
)));
}
}
client_config.part_size(config.part_size);

let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
let user_agent_header = user_agent.build();
Expand All @@ -373,7 +392,8 @@ impl S3CrtClientInner {
next_request_counter: AtomicU64::new(0),
user_agent_header,
request_payer: config.request_payer,
part_size: config.part_size,
read_part_size: config.read_part_size,
write_part_size: config.write_part_size,
bucket_owner: config.bucket_owner,
credentials_provider: Some(credentials_provider),
host_resolver,
Expand Down Expand Up @@ -1145,11 +1165,18 @@ impl ObjectClient for S3CrtClient {
type PutObjectRequest = S3PutObjectRequest;
type ClientError = S3RequestError;

fn part_size(&self) -> Option<usize> {
fn read_part_size(&self) -> Option<usize> {
// TODO: the CRT does some clamping to a max size rather than just swallowing the part size
// we configured it with, so this might be wrong. Right now the only clamping is to the max
// S3 part size (5GiB), so this shouldn't affect the result.
Some(self.inner.part_size)
Some(self.inner.read_part_size)
}

fn write_part_size(&self) -> Option<usize> {
// TODO: the CRT does some clamping to a max size rather than just swallowing the part size
// we configured it with, so this might be wrong. Right now the only clamping is to the max
// S3 part size (5GiB), so this shouldn't affect the result.
Some(self.inner.write_part_size)
}

async fn delete_object(
Expand Down Expand Up @@ -1225,10 +1252,7 @@ mod tests {

/// Test explicit validation in [Client::new]
fn client_new_fails_with_invalid_part_size(part_size: usize) {
let config = S3ClientConfig {
part_size,
..Default::default()
};
let config = S3ClientConfig::default().part_size(part_size);
let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
let message = if cfg!(target_pointer_width = "64") {
"invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
Expand Down
10 changes: 7 additions & 3 deletions mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use pin_project::pin_project;
use crate::object_client::{ETag, GetBodyPart, GetObjectError, ObjectClientError, ObjectClientResult};
use crate::s3_crt_client::{GetObjectRequest, S3CrtClient, S3HttpRequest, S3Operation, S3RequestError};

use super::S3CrtClientInner;

impl S3CrtClient {
/// Create and begin a new GetObject request. The returned [GetObjectRequest] is a [Stream] of
/// body parts of the object, which will be delivered in order.
Expand Down Expand Up @@ -59,10 +61,12 @@ impl S3CrtClient {

let (sender, receiver) = futures::channel::mpsc::unbounded();

let request = self.inner.make_meta_request(
message,
S3Operation::GetObject,
let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::GetObject);
options.part_size(self.inner.write_part_size as u64);
let request = self.inner.make_meta_request_from_options(
options,
span,
|_| (),
|_, _| (),
move |offset, data| {
let _ = sender.unbounded_send(Ok((offset, data.into())));
Expand Down
1 change: 1 addition & 0 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl S3CrtClient {
let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObject);
options.send_using_async_writes(true);
options.on_upload_review(move |review| callback.invoke(review));
options.part_size(self.inner.write_part_size as u64);

// Before the first write, we need to await for the multi-part upload to be created, so we can report errors.
// To do so, we need to detect one of two events (whichever comes first):
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/tests/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn test_get_object_backpressure(size: usize, range: Option<Range<u64>>) {
async fn verify_backpressure_get_object() {
let initial_window_size = 256;
let client: S3CrtClient = get_test_backpressure_client(initial_window_size);
let part_size = client.part_size().unwrap();
let part_size = client.read_part_size().unwrap();

let size = part_size * 2;
let range = 0..(part_size + 1) as u64;
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async fn test_put_object_write_cancelled() {
.expect("put_object should succeed");

// Write a multiple of `part_size` to ensure it will not complete immediately.
let full_size = client.part_size().unwrap() * 10;
let full_size = client.write_part_size().unwrap() * 10;
let buffer = vec![0u8; full_size];

// Complete one write to ensure the MPU was created and the buffer for the upload request is available.
Expand Down
30 changes: 29 additions & 1 deletion mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,38 @@ pub struct CliArgs {
long,
help = "Part size for multi-part GET and PUT",
default_value = "8388608",
value_name = "Size",
value_parser = value_parser!(u64).range(1..usize::MAX as u64),
help_heading = CLIENT_OPTIONS_HEADER
)]
pub part_size: u64,

#[clap(
long,
long_help = "Optional, part size for multi-part GET operation. (in Bytes)
Tweak this argument for better download throughput and efficiency.
Original part-size will be used if not specified.",
value_name = "Size",
value_parser = value_parser!(u64).range(1..usize::MAX as u64),
help_heading = CLIENT_OPTIONS_HEADER
)]
pub read_part_size: Option<u64>,

#[clap(
long,
long_help = "Optional, part size for multi-part PUT operations. (in Bytes)
Tweaking this argument for better upload speed and efficiency.
Original part-size will be used if not specified.",
value_name = "Size",
value_parser = value_parser!(u64).range(1..usize::MAX as u64),
help_heading = CLIENT_OPTIONS_HEADER
)]
pub write_part_size: Option<u64>,

#[clap(
long,
help = "Owner UID [default: current user's UID]",
Expand Down Expand Up @@ -618,7 +645,8 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo
let mut client_config = S3ClientConfig::new()
.auth_config(auth_config)
.throughput_target_gbps(throughput_target_gbps)
.part_size(args.part_size as usize)
.read_part_size(args.read_part_size.unwrap_or(args.part_size) as usize)
.write_part_size(args.write_part_size.unwrap_or(args.part_size) as usize)
.user_agent(user_agent);
if args.requester_pays {
client_config = client_config.request_payer("requester");
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/prefetch/part_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
assert!(preferred_part_size > 0);
let request_range = range.align(client.part_size().unwrap_or(8 * 1024 * 1024) as u64, true);
let request_range = range.align(client.read_part_size().unwrap_or(8 * 1024 * 1024) as u64, true);
let start = request_range.start();
let size = request_range.len();

Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl<Client: ObjectClient> UploadRequest<Client> {
let request = inner.client.put_object(bucket, key, &params).await?;
let maximum_upload_size = inner
.client
.part_size()
.write_part_size()
.map(|ps| ps.saturating_mul(MAX_S3_MULTIPART_UPLOAD_PARTS));

Ok(Self {
Expand Down

0 comments on commit 7d809bd

Please sign in to comment.