diff --git a/s3tests.conf.SAMPLE b/s3tests.conf.SAMPLE index c0dc89ae..c0067e61 100644 --- a/s3tests.conf.SAMPLE +++ b/s3tests.conf.SAMPLE @@ -142,6 +142,13 @@ secret_key = aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa user_id = RGW11111111111111111 email = account1@ceph.com +# iam account non-root user in the same account [iam root] +[iam root alt] +access_key = CCCCCCCCCCCCCCCCCCcc +secret_key = cccccccccccccccccccccccccccccccccccccccc +user_id = testacct1user +email = account1@ceph.com + # iam account root user in a different account than [iam root] [iam alt root] access_key = BBBBBBBBBBBBBBBBBBbb diff --git a/s3tests_boto3/functional/__init__.py b/s3tests_boto3/functional/__init__.py index 2f9f7e10..7487f4b5 100644 --- a/s3tests_boto3/functional/__init__.py +++ b/s3tests_boto3/functional/__init__.py @@ -271,6 +271,11 @@ def configure(): config.iam_root_user_id = cfg.get('iam root',"user_id") config.iam_root_email = cfg.get('iam root',"email") + config.iam_root_alt_access_key = cfg.get('iam root alt',"access_key") + config.iam_root_alt_secret_key = cfg.get('iam root alt',"secret_key") + config.iam_root_alt_user_id = cfg.get('iam root alt',"user_id") + config.iam_root_alt_email = cfg.get('iam root alt',"email") + config.iam_alt_root_access_key = cfg.get('iam alt root',"access_key") config.iam_alt_root_secret_key = cfg.get('iam alt root',"secret_key") config.iam_alt_root_user_id = cfg.get('iam alt root',"user_id") @@ -461,6 +466,17 @@ def get_iam_root_client(**kwargs): verify=config.default_ssl_verify, **kwargs) +def get_iam_root_alt_client(**kwargs): + kwargs.setdefault('service_name', 's3') + kwargs.setdefault('aws_access_key_id', config.iam_root_alt_access_key) + kwargs.setdefault('aws_secret_access_key', config.iam_root_alt_secret_key) + + return boto3.client(endpoint_url=config.default_endpoint, + region_name='', + use_ssl=config.default_is_secure, + verify=config.default_ssl_verify, + **kwargs) + def get_iam_alt_root_client(**kwargs): kwargs.setdefault('service_name', 'iam') kwargs.setdefault('aws_access_key_id', config.iam_alt_root_access_key) diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 47cc5255..c801c5b1 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -66,6 +66,8 @@ get_alt_user_id, get_alt_email, get_alt_client, + get_iam_root_client, + get_iam_root_alt_client, get_tenant_client, get_tenant_iam_client, get_tenant_user_id, @@ -10403,6 +10405,83 @@ def test_bucketv2_policy(): response = alt_client.list_objects_v2(Bucket=bucket_name) assert len(response['Contents']) == 1 +@pytest.mark.bucket_policy +def test_bucket_policy_deny_self_denied_policy(): + root_client = get_iam_root_client(service_name="s3") + bucket_name = get_new_bucket(root_client) + + resource1 = "arn:aws:s3:::" + bucket_name + resource2 = "arn:aws:s3:::" + bucket_name + "/*" + policy_document = json.dumps( + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Deny", + "Principal": "*", + "Action": [ + "s3:PutBucketPolicy", + "s3:GetBucketPolicy", + "s3:DeleteBucketPolicy", + ], + "Resource": [ + "{}".format(resource1), + "{}".format(resource2) + ] + }] + }) + + root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) + + # non-root account should not be able to get, put or delete bucket policy + root_alt_client = get_iam_root_alt_client() + check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name) + check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name) + check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document) + + # root account should be able to get, put or delete bucket policy + response = root_client.get_bucket_policy(Bucket=bucket_name) + assert response['Policy'] == policy_document + root_client.delete_bucket_policy(Bucket=bucket_name) + root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) + +@pytest.mark.bucket_policy +def test_bucket_policy_deny_self_denied_policy_confirm_header(): + root_client = get_iam_root_client(service_name="s3") + bucket_name = get_new_bucket(root_client) + + resource1 = "arn:aws:s3:::" + bucket_name + resource2 = "arn:aws:s3:::" + bucket_name + "/*" + policy_document = json.dumps( + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Deny", + "Principal": "*", + "Action": [ + "s3:PutBucketPolicy", + "s3:GetBucketPolicy", + "s3:DeleteBucketPolicy", + ], + "Resource": [ + "{}".format(resource1), + "{}".format(resource2) + ] + }] + }) + + root_client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document, ConfirmRemoveSelfBucketAccess=True) + + # non-root account should not be able to get, put or delete bucket policy + root_alt_client = get_iam_root_alt_client() + check_access_denied(root_alt_client.get_bucket_policy, Bucket=bucket_name) + check_access_denied(root_alt_client.delete_bucket_policy, Bucket=bucket_name) + check_access_denied(root_alt_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document) + + # root account should not be able to get, put or delete bucket policy + check_access_denied(root_client.get_bucket_policy, Bucket=bucket_name) + check_access_denied(root_client.delete_bucket_policy, Bucket=bucket_name) + check_access_denied(root_client.put_bucket_policy, Bucket=bucket_name, Policy=policy_document) + @pytest.mark.bucket_policy def test_bucket_policy_acl(): bucket_name = get_new_bucket()