Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add arguments to specify GET and PUT part size independently #949

Merged
merged 6 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ where
type PutObjectRequest = FailurePutObjectRequest<Client, GetWrapperState>;
type ClientError = Client::ClientError;

fn part_size(&self) -> Option<usize> {
self.client.part_size()
fn read_part_size(&self) -> Option<usize> {
self.client.read_part_size()
}

fn write_part_size(&self) -> Option<usize> {
self.client.write_part_size()
}

async fn delete_object(
Expand Down
6 changes: 5 additions & 1 deletion mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,11 @@ impl ObjectClient for MockClient {
type PutObjectRequest = MockPutObjectRequest;
type ClientError = MockClientError;

fn part_size(&self) -> Option<usize> {
fn read_part_size(&self) -> Option<usize> {
Some(self.config.part_size)
}

fn write_part_size(&self) -> Option<usize> {
Some(self.config.part_size)
}

Expand Down
8 changes: 6 additions & 2 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ impl ObjectClient for ThroughputMockClient {
type PutObjectRequest = MockPutObjectRequest;
type ClientError = MockClientError;

fn part_size(&self) -> Option<usize> {
self.inner.part_size()
fn read_part_size(&self) -> Option<usize> {
self.inner.read_part_size()
}

fn write_part_size(&self) -> Option<usize> {
self.inner.write_part_size()
}

async fn delete_object(
Expand Down
8 changes: 6 additions & 2 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,13 @@ pub trait ObjectClient {
type PutObjectRequest: PutObjectRequest<ClientError = Self::ClientError>;
type ClientError: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static;

/// Query the part size this client uses for PUT and GET operations to the object store. This
/// Query the part size this client uses for GET operations to the object store. This
/// can be `None` if the client does not do multi-part operations.
fn part_size(&self) -> Option<usize>;
fn read_part_size(&self) -> Option<usize>;

/// Query the part size this client uses for PUT operations to the object store. This
/// can be `None` if the client does not do multi-part operations.
fn write_part_size(&self) -> Option<usize>;

/// Delete a single object from the object store.
///
Expand Down
57 changes: 40 additions & 17 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ macro_rules! event {
pub struct S3ClientConfig {
auth_config: S3ClientAuthConfig,
throughput_target_gbps: f64,
part_size: usize,
read_part_size: usize,
write_part_size: usize,
endpoint_config: EndpointConfig,
user_agent: Option<UserAgent>,
request_payer: Option<String>,
Expand All @@ -101,7 +102,8 @@ impl Default for S3ClientConfig {
Self {
auth_config: Default::default(),
throughput_target_gbps: 10.0,
part_size: DEFAULT_PART_SIZE,
read_part_size: DEFAULT_PART_SIZE,
write_part_size: DEFAULT_PART_SIZE,
endpoint_config: EndpointConfig::new("us-east-1"),
user_agent: None,
request_payer: None,
Expand All @@ -128,7 +130,22 @@ impl S3ClientConfig {
/// Set the part size for multi-part operations to S3 (both PUT and GET)
#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn part_size(mut self, part_size: usize) -> Self {
self.part_size = part_size;
self.read_part_size = part_size;
self.write_part_size = part_size;
self
}

/// Set the part size for multi-part-get operations to S3.
#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn read_part_size(mut self, size: usize) -> Self {
self.read_part_size = size;
self
}

/// Set the part size for multi-part-put operations to S3.
#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn write_part_size(mut self, size: usize) -> Self {
self.write_part_size = size;
self
}

Expand Down Expand Up @@ -248,7 +265,8 @@ struct S3CrtClientInner {
/// Here it will add the user agent prefix and s3 client information.
user_agent_header: String,
request_payer: Option<String>,
part_size: usize,
read_part_size: usize,
write_part_size: usize,
bucket_owner: Option<String>,
credentials_provider: Option<CredentialsProvider>,
host_resolver: HostResolver,
Expand Down Expand Up @@ -352,13 +370,15 @@ impl S3CrtClientInner {

// max_part_size is 5GB or less depending on the platform (4GB on 32-bit)
let max_part_size = cmp::min(5_u64 * 1024 * 1024 * 1024, usize::MAX as u64) as usize;
if !(5 * 1024 * 1024..=max_part_size).contains(&config.part_size) {
return Err(NewClientError::InvalidConfiguration(format!(
"part size must be at between 5MiB and {}GiB",
max_part_size / 1024 / 1024 / 1024
)));
// TODO: Review the part size validation for read_part_size, read_part_size can have a more relax limit.
for part_size in [config.read_part_size, config.write_part_size] {
if !(5 * 1024 * 1024..=max_part_size).contains(&part_size) {
return Err(NewClientError::InvalidConfiguration(format!(
"part size must be at between 5MiB and {}GiB",
max_part_size / 1024 / 1024 / 1024
)));
}
}
client_config.part_size(config.part_size);

let user_agent = config.user_agent.unwrap_or_else(|| UserAgent::new(None));
let user_agent_header = user_agent.build();
Expand All @@ -373,7 +393,8 @@ impl S3CrtClientInner {
next_request_counter: AtomicU64::new(0),
user_agent_header,
request_payer: config.request_payer,
part_size: config.part_size,
read_part_size: config.read_part_size,
write_part_size: config.write_part_size,
bucket_owner: config.bucket_owner,
credentials_provider: Some(credentials_provider),
host_resolver,
Expand Down Expand Up @@ -1145,11 +1166,16 @@ impl ObjectClient for S3CrtClient {
type PutObjectRequest = S3PutObjectRequest;
type ClientError = S3RequestError;

fn part_size(&self) -> Option<usize> {
fn read_part_size(&self) -> Option<usize> {
Some(self.inner.read_part_size)
}

fn write_part_size(&self) -> Option<usize> {
// TODO: the CRT does some clamping to a max size rather than just swallowing the part size
// we configured it with, so this might be wrong. Right now the only clamping is to the max
// S3 part size (5GiB), so this shouldn't affect the result.
Some(self.inner.part_size)
// https://github.com/awslabs/aws-c-s3/blob/94e3342c12833c5199/source/s3_client.c#L337-L344
Some(self.inner.write_part_size)
}

async fn delete_object(
Expand Down Expand Up @@ -1225,10 +1251,7 @@ mod tests {

/// Test explicit validation in [Client::new]
fn client_new_fails_with_invalid_part_size(part_size: usize) {
let config = S3ClientConfig {
part_size,
..Default::default()
};
let config = S3ClientConfig::default().part_size(part_size);
let e = S3CrtClient::new(config).expect_err("creating a new client should fail");
let message = if cfg!(target_pointer_width = "64") {
"invalid configuration: part size must be at between 5MiB and 5GiB".to_string()
Expand Down
12 changes: 8 additions & 4 deletions mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use mountpoint_s3_crt::s3::client::MetaRequestResult;
use pin_project::pin_project;

use crate::object_client::{ETag, GetBodyPart, GetObjectError, ObjectClientError, ObjectClientResult};
use crate::s3_crt_client::{GetObjectRequest, S3CrtClient, S3HttpRequest, S3Operation, S3RequestError};
use crate::s3_crt_client::{
GetObjectRequest, S3CrtClient, S3CrtClientInner, S3HttpRequest, S3Operation, S3RequestError,
};

impl S3CrtClient {
/// Create and begin a new GetObject request. The returned [GetObjectRequest] is a [Stream] of
Expand Down Expand Up @@ -59,10 +61,12 @@ impl S3CrtClient {

let (sender, receiver) = futures::channel::mpsc::unbounded();

let request = self.inner.make_meta_request(
message,
S3Operation::GetObject,
let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::GetObject);
options.part_size(self.inner.read_part_size as u64);
let request = self.inner.make_meta_request_from_options(
options,
span,
|_| (),
|_, _| (),
move |offset, data| {
let _ = sender.unbounded_send(Ok((offset, data.into())));
Expand Down
1 change: 1 addition & 0 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl S3CrtClient {
let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObject);
options.send_using_async_writes(true);
options.on_upload_review(move |review| callback.invoke(review));
options.part_size(self.inner.write_part_size as u64);

// Before the first write, we need to await for the multi-part upload to be created, so we can report errors.
// To do so, we need to detect one of two events (whichever comes first):
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/tests/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn test_get_object_backpressure(size: usize, range: Option<Range<u64>>) {
async fn verify_backpressure_get_object() {
let initial_window_size = 256;
let client: S3CrtClient = get_test_backpressure_client(initial_window_size);
let part_size = client.part_size().unwrap();
let part_size = client.read_part_size().unwrap();

let size = part_size * 2;
let range = 0..(part_size + 1) as u64;
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ async fn test_put_object_write_cancelled() {
.expect("put_object should succeed");

// Write a multiple of `part_size` to ensure it will not complete immediately.
let full_size = client.part_size().unwrap() * 10;
let full_size = client.write_part_size().unwrap() * 10;
let buffer = vec![0u8; full_size];

// Complete one write to ensure the MPU was created and the buffer for the upload request is available.
Expand Down
26 changes: 24 additions & 2 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,34 @@ pub struct CliArgs {

#[clap(
long,
help = "Part size for multi-part GET and PUT",
help = "Part size for multi-part GET and PUT in bytes",
default_value = "8388608",
value_name = "SIZE",
value_parser = value_parser!(u64).range(1..usize::MAX as u64),
help_heading = CLIENT_OPTIONS_HEADER
)]
pub part_size: u64,
crrow marked this conversation as resolved.
Show resolved Hide resolved

#[clap(
long,
help = "Part size for multi-part GET in bytes [default: 8388608]",
value_name = "SIZE",
value_parser = value_parser!(u64).range(1..usize::MAX as u64),
help_heading = CLIENT_OPTIONS_HEADER,
conflicts_with = "part_size",
)]
pub read_part_size: Option<u64>,

#[clap(
long,
help = "Part size for multi-part PUT in bytes [default: 8388608]",
value_name = "SIZE",
value_parser = value_parser!(u64).range(1..usize::MAX as u64),
help_heading = CLIENT_OPTIONS_HEADER,
conflicts_with = "part_size",
)]
pub write_part_size: Option<u64>,

#[clap(
long,
help = "Owner UID [default: current user's UID]",
Expand Down Expand Up @@ -618,7 +639,8 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo
let mut client_config = S3ClientConfig::new()
.auth_config(auth_config)
.throughput_target_gbps(throughput_target_gbps)
.part_size(args.part_size as usize)
.read_part_size(args.read_part_size.unwrap_or(args.part_size) as usize)
.write_part_size(args.write_part_size.unwrap_or(args.part_size) as usize)
.user_agent(user_agent);
if args.requester_pays {
client_config = client_config.request_payer("requester");
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/prefetch/part_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
assert!(preferred_part_size > 0);
let request_range = range.align(client.part_size().unwrap_or(8 * 1024 * 1024) as u64, true);
let request_range = range.align(client.read_part_size().unwrap_or(8 * 1024 * 1024) as u64, true);
let start = request_range.start();
let size = request_range.len();

Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl<Client: ObjectClient> UploadRequest<Client> {
let request = inner.client.put_object(bucket, key, &params).await?;
let maximum_upload_size = inner
.client
.part_size()
.write_part_size()
.map(|ps| ps.saturating_mul(MAX_S3_MULTIPART_UPLOAD_PARTS));

Ok(Self {
Expand Down
24 changes: 24 additions & 0 deletions mountpoint-s3/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,27 @@ fn sse_key_not_allowed_with_aes256() -> Result<(), Box<dyn std::error::Error>> {

Ok(())
}

#[test_case(Some(1024), Some(1024))]
#[test_case(None, Some(1024))]
#[test_case(Some(1024), None)]
fn verify_new_part_size_config_conflict_with_old_one(
read_part_size: Option<u64>,
write_part_size: Option<u64>,
) -> Result<(), Box<dyn std::error::Error>> {
let dir = assert_fs::TempDir::new()?;
let mut cmd = Command::cargo_bin("mount-s3")?;
cmd.arg("test-bucket").arg(dir.path()).arg("--part-size=1024");

if let Some(read_part_size) = read_part_size {
cmd.arg(format!("--read-part-size={}", read_part_size));
}
if let Some(write_part_size) = write_part_size {
cmd.arg(format!("--write-part-size={}", write_part_size));
}

let error_message = "cannot be used with";
cmd.assert().failure().stderr(predicate::str::contains(error_message));

Ok(())
}
Loading