Skip to content

Commit

Permalink
Add arguments to specify GET and PUT part size independently (#949)
Browse files Browse the repository at this point in the history
* feat: separate part-size for PUT & GET

Signed-off-by: Ryan Tan <[email protected]>

* chore: follow import style

Signed-off-by: Ryan Tan <[email protected]>

* fix: simplify cli help; make separated part-size conflict with old one; use read_part_size when get

Signed-off-by: Ryan Tan <[email protected]>

* Verify new separated part size arg is conflicted with old one

Signed-off-by: Ryan Tan <[email protected]>

* Drop Option<u64> on part-size

Signed-off-by: Ryan Tan <[email protected]>

* Move part-size back

Signed-off-by: Ryan Tan <[email protected]>

---------

Signed-off-by: Ryan Tan <[email protected]>
  • Loading branch information
crrow committed Jul 30, 2024
1 parent b4e11b8 commit 0fff132
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 34 deletions.
8 changes: 6 additions & 2 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ where
type PutObjectRequest = FailurePutObjectRequest<Client, GetWrapperState>;
type ClientError = Client::ClientError;

fn part_size(&self) -> Option<usize> {
self.client.part_size()
fn read_part_size(&self) -> Option<usize> {
self.client.read_part_size()
}

fn write_part_size(&self) -> Option<usize> {
self.client.write_part_size()
}

async fn delete_object(
Expand Down
6 changes: 5 additions & 1 deletion mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,11 @@ impl ObjectClient for MockClient {
type PutObjectRequest = MockPutObjectRequest;
type ClientError = MockClientError;

fn part_size(&self) -> Option<usize> {
fn read_part_size(&self) -> Option<usize> {
Some(self.config.part_size)
}

fn write_part_size(&self) -> Option<usize> {
Some(self.config.part_size)
}

Expand Down
8 changes: 6 additions & 2 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ impl ObjectClient for ThroughputMockClient {
type PutObjectRequest = MockPutObjectRequest;
type ClientError = MockClientError;

fn part_size(&self) -> Option<usize> {
self.inner.part_size()
fn read_part_size(&self) -> Option<usize> {
self.inner.read_part_size()
}

fn write_part_size(&self) -> Option<usize> {
self.inner.write_part_size()
}

async fn delete_object(
Expand Down
8 changes: 6 additions & 2 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,13 @@ pub trait ObjectClient {
type PutObjectRequest: PutObjectRequest<ClientError = Self::ClientError>;
type ClientError: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static;

/// Query the part size this client uses for PUT and GET operations to the object store. This
/// Query the part size this client uses for GET operations to the object store. This
/// can be `None` if the client does not do multi-part operations.
fn part_size(&self) -> Option<usize>;
fn read_part_size(&self) -> Option<usize>;

/// Query the part size this client uses for PUT operations to the object store. This
/// can be `None` if the client does not do multi-part operations.
fn write_part_size(&self) -> Option<usize>;

/// Delete a single object from the object store.
///
Expand Down
57 changes: 40 additions & 17 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ macro_rules! event {
pub struct S3ClientConfig {
auth_config: S3ClientAuthConfig,
throughput_target_gbps: f64,
part_size: usize,
read_part_size: usize,
write_part_size: usize,
endpoint_config: EndpointConfig,
user_agent: Option<UserAgent>,
request_payer: Option<String>,
Expand All @@ -101,7 +102,8 @@ impl Default for S3ClientConfig {
Self {
auth_config: Default::default(),
throughput_target_gbps: 10.0,
part_size: DEFAULT_PART_SIZE,
read_part_size: DEFAULT_PART_SIZE,
write_part_size: DEFAULT_PART_SIZE,
endpoint_config: EndpointConfig::new("us-east-1"),
user_agent: None,
request_payer: None,
Expand All @@ -128,7 +130,22 @@ impl S3ClientConfig {
/// Set the part size for multi-part operations to S3 (both PUT and GET)
#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn part_size(mut self, part_size: usize) -> Self {
self.part_size = part_size;
self.read_part_size = part_size;
self.write_part_size = part_size;
self
}

/// Set the part size for multi-part-get operations to S3.
#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn read_part_size(mut self, size: usize) -> Self {
self.read_part_size = size;
self
}

/// Set the part size for multi-part-put operations to S3.
#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn write_part_size(mut self, size: usize) -> Self {
self.write_part_size = size;
self
}

Expand Down Expand Up @@ -248,7 +265,8 @@ struct S3CrtClientInner {
/// Here it will add the user agent prefix and s3 client information.
user_agent_header: String,
request_payer: Option<String>,
part_size: usize,
read_part_size: usize,
write_part_size: usize,
bucket_owner: Option<String>,
credentials_provider: Option<CredentialsProvider>,
host_resolver: HostResolver,
Expand Down Expand Up @@ -352,13 +370,15 @@ impl S3CrtClientInner {

// max_part_size is 5GB or less depending on the platform (4GB on 32-bit)
let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
if !(5 * 1024 * 1024..=max_part_size).contains(&config.part_size) {
return Err(NewClientError::InvalidConfiguration(format!(
"part size must be at between 5MiB and {}GiB",
max_part_size / 1024 / 1024 / 1024
)));
// TODO: Review the part size validation for read_part_size, read_part_size can have a more relax limit.
for part_size in [config.read_part_size, config.write_part_size] {
if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
return Err(NewClientError::InvalidConfiguration(format!(
"part size must be at between 5MiB and {}GiB",
max_part_size / 1024 / 1024 / 1024
)));
}
}
client_config.part_size(config.part_size);

let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
let user_agent_header = user_agent.build();
Expand All @@ -373,7 +393,8 @@ impl S3CrtClientInner {
next_request_counter: AtomicU64::new(0),
user_agent_header,
request_payer: config.request_payer,
part_size: config.part_size,
read_part_size: config.read_part_size,
write_part_size: config.write_part_size,
bucket_owner: config.bucket_owner,
credentials_provider: Some(credentials_provider),
host_resolver,
Expand Down Expand Up @@ -1145,11 +1166,16 @@ impl ObjectClient for S3CrtClient {
type PutObjectRequest = S3PutObjectRequest;
type ClientError = S3RequestError;

fn part_size(&self) -> Option<usize> {
fn read_part_size(&self) -> Option<usize> {
Some(self.inner.read_part_size)
}

fn write_part_size(&self) -> Option<usize> {
// TODO: the CRT does some clamping to a max size rather than just swallowing the part size
// we configured it with, so this might be wrong. Right now the only clamping is to the max
// S3 part size (5GiB), so this shouldn't affect the result.
Some(self.inner.part_size)
// https://github.com/awslabs/aws-c-s3/blob/94e3342c12833c5199/source/s3_client.c#L337-L344
Some(self.inner.write_part_size)
}

async fn delete_object(
Expand Down Expand Up @@ -1225,10 +1251,7 @@ mod tests {

/// Test explicit validation in [Client::new]
fn client_new_fails_with_invalid_part_size(part_size: usize) {
let config = S3ClientConfig {
part_size,
..Default::default()
};
let config = S3ClientConfig::default().part_size(part_size);
let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
let message = if cfg!(target_pointer_width = "64") {
"invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
Expand Down
12 changes: 8 additions & 4 deletions mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use mountpoint_s3_crt::s3::client::MetaRequestResult;
use pin_project::pin_project;

use crate::object_client::{ETag, GetBodyPart, GetObjectError, ObjectClientError, ObjectClientResult};
use crate::s3_crt_client::{GetObjectRequest, S3CrtClient, S3HttpRequest, S3Operation, S3RequestError};
use crate::s3_crt_client::{
GetObjectRequest, S3CrtClient, S3CrtClientInner, S3HttpRequest, S3Operation, S3RequestError,
};

impl S3CrtClient {
/// Create and begin a new GetObject request. The returned [GetObjectRequest] is a [Stream] of
Expand Down Expand Up @@ -59,10 +61,12 @@ impl S3CrtClient {

let (sender, receiver) = futures::channel::mpsc::unbounded();

let request = self.inner.make_meta_request(
message,
S3Operation::GetObject,
let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::GetObject);
options.part_size(self.inner.read_part_size as u64);
let request = self.inner.make_meta_request_from_options(
options,
span,
|_| (),
|_, _| (),
move |offset, data| {
let _ = sender.unbounded_send(Ok((offset, data.into())));
Expand Down
1 change: 1 addition & 0 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl S3CrtClient {
let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObject);
options.send_using_async_writes(true);
options.on_upload_review(move |review| callback.invoke(review));
options.part_size(self.inner.write_part_size as u64);

// Before the first write, we need to await for the multi-part upload to be created, so we can report errors.
// To do so, we need to detect one of two events (whichever comes first):
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/tests/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn test_get_object_backpressure(size: usize, range: Option<Range<u64>>) {
async fn verify_backpressure_get_object() {
let initial_window_size = 256;
let client: S3CrtClient = get_test_backpressure_client(initial_window_size);
let part_size = client.part_size().unwrap();
let part_size = client.read_part_size().unwrap();

let size = part_size * 2;
let range = 0..(part_size + 1) as u64;
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async fn test_put_object_write_cancelled() {
.expect("put_object should succeed");

// Write a multiple of `part_size` to ensure it will not complete immediately.
let full_size = client.part_size().unwrap() * 10;
let full_size = client.write_part_size().unwrap() * 10;
let buffer = vec![0u8; full_size];

// Complete one write to ensure the MPU was created and the buffer for the upload request is available.
Expand Down
26 changes: 24 additions & 2 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,34 @@ pub struct CliArgs {

#[clap(
long,
help = "Part size for multi-part GET and PUT",
help = "Part size for multi-part GET and PUT in bytes",
default_value = "8388608",
value_name = "SIZE",
value_parser = value_parser!(u64).range(1..usize::MAX as u64),
help_heading = CLIENT_OPTIONS_HEADER
)]
pub part_size: u64,

#[clap(
long,
help = "Part size for multi-part GET in bytes [default: 8388608]",
value_name = "SIZE",
value_parser = value_parser!(u64).range(1..usize::MAX as u64),
help_heading = CLIENT_OPTIONS_HEADER,
conflicts_with = "part_size",
)]
pub read_part_size: Option<u64>,

#[clap(
long,
help = "Part size for multi-part PUT in bytes [default: 8388608]",
value_name = "SIZE",
value_parser = value_parser!(u64).range(1..usize::MAX as u64),
help_heading = CLIENT_OPTIONS_HEADER,
conflicts_with = "part_size",
)]
pub write_part_size: Option<u64>,

#[clap(
long,
help = "Owner UID [default: current user's UID]",
Expand Down Expand Up @@ -618,7 +639,8 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo
let mut client_config = S3ClientConfig::new()
.auth_config(auth_config)
.throughput_target_gbps(throughput_target_gbps)
.part_size(args.part_size as usize)
.read_part_size(args.read_part_size.unwrap_or(args.part_size) as usize)
.write_part_size(args.write_part_size.unwrap_or(args.part_size) as usize)
.user_agent(user_agent);
if args.requester_pays {
client_config = client_config.request_payer("requester");
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/prefetch/part_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
assert!(preferred_part_size > 0);
let request_range = range.align(client.part_size().unwrap_or(8 * 1024 * 1024) as u64, true);
let request_range = range.align(client.read_part_size().unwrap_or(8 * 1024 * 1024) as u64, true);
let start = request_range.start();
let size = request_range.len();

Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl<Client: ObjectClient> UploadRequest<Client> {
let request = inner.client.put_object(bucket, key, &params).await?;
let maximum_upload_size = inner
.client
.part_size()
.write_part_size()
.map(|ps| ps.saturating_mul(MAX_S3_MULTIPART_UPLOAD_PARTS));

Ok(Self {
Expand Down
24 changes: 24 additions & 0 deletions mountpoint-s3/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,27 @@ fn sse_key_not_allowed_with_aes256() -> Result<(), Box<dyn std::error::Error>> {

Ok(())
}

#[test_case(Some(1024), Some(1024))]
#[test_case(None, Some(1024))]
#[test_case(Some(1024), None)]
fn verify_new_part_size_config_conflict_with_old_one(
read_part_size: Option<u64>,
write_part_size: Option<u64>,
) -> Result<(), Box<dyn std::error::Error>> {
let dir = assert_fs::TempDir::new()?;
let mut cmd = Command::cargo_bin("mount-s3")?;
cmd.arg("test-bucket").arg(dir.path()).arg("--part-size=1024");

if let Some(read_part_size) = read_part_size {
cmd.arg(format!("--read-part-size={}", read_part_size));
}
if let Some(write_part_size) = write_part_size {
cmd.arg(format!("--write-part-size={}", write_part_size));
}

let error_message = "cannot be used with";
cmd.assert().failure().stderr(predicate::str::contains(error_message));

Ok(())
}

0 comments on commit 0fff132

Please sign in to comment.