Skip to content

Commit

Permalink
Merge pull request #606 from yuvalif/wip-yuval-bucket-logging-flush
Browse files Browse the repository at this point in the history
rgw/logging: tests for flush API

Reviewed-By: Ali Masarwe <[email protected]>
  • Loading branch information
yuvalif authored Dec 18, 2024
2 parents 14768d8 + 79169a8 commit ae8bebd
Showing 1 changed file with 189 additions and 27 deletions.
216 changes: 189 additions & 27 deletions s3tests_boto3/functional/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -14017,7 +14017,7 @@ def _parse_standard_log_record(record):
record = record.replace('[', '"').replace(']', '"')
chunks = shlex.split(record)
assert len(chunks) == 26
return json.dumps({
return {
'BucketOwner': chunks[0],
'BucketName': chunks[1],
'RequestDateTime': chunks[2],
Expand All @@ -14044,14 +14044,14 @@ def _parse_standard_log_record(record):
'TLSVersion': chunks[23],
'AccessPointARN': chunks[24],
'ACLRequired': chunks[25],
}, indent=4)
}


def _parse_journal_log_record(record):
record = record.replace('[', '"').replace(']', '"')
chunks = shlex.split(record)
assert len(chunks) == 8
return json.dumps({
return {
'BucketOwner': chunks[0],
'BucketName': chunks[1],
'RequestDateTime': chunks[2],
Expand All @@ -14060,7 +14060,7 @@ def _parse_journal_log_record(record):
'ObjectSize': chunks[5],
'VersionID': chunks[6],
'ETAG': chunks[7],
}, indent=4)
}

def _parse_log_record(record, record_type):
if record_type == 'Standard':
Expand All @@ -14072,6 +14072,36 @@ def _parse_log_record(record, record_type):

expected_object_roll_time = 5

import logging

logger = logging.getLogger(__name__)


def _verify_records(records, bucket_name, event_type, src_keys, record_type, expected_count, exact_match=False):
keys_found = []
all_keys = []
for record in iter(records.splitlines()):
parsed_record = _parse_log_record(record, record_type)
logger.info('bucket log record: %s', json.dumps(parsed_record, indent=4))
if bucket_name in record and event_type in record:
all_keys.append(parsed_record['Key'])
for key in src_keys:
if key in record:
keys_found.append(key)
break
logger.info('keys found in bucket log: %s', str(all_keys))
logger.info('keys from the source bucket: %s', str(src_keys))
if exact_match:
return len(keys_found) == expected_count and len(keys_found) == len(all_keys)
return len(keys_found) == expected_count


def randcontent():
letters = string.ascii_lowercase
length = random.randint(10, 1024)
return ''.join(random.choice(letters) for i in range(length))


@pytest.mark.bucket_logging
def test_put_bucket_logging():
src_bucket_name = get_new_bucket_name()
Expand Down Expand Up @@ -14176,6 +14206,161 @@ def test_put_bucket_logging():
assert response['LoggingEnabled'] == logging_enabled


def _bucket_logging_key_filter(log_type):
src_bucket_name = get_new_bucket_name()
src_bucket = get_new_bucket_resource(name=src_bucket_name)
log_bucket_name = get_new_bucket_name()
log_bucket = get_new_bucket_resource(name=log_bucket_name)
client = get_client()

logging_enabled = {
'TargetBucket': log_bucket_name,
'LoggingType': log_type,
'TargetPrefix': 'log/',
'ObjectRollTime': expected_object_roll_time,
'TargetObjectKeyFormat': {'SimplePrefix': {}},
'RecordsBatchSize': 0,
'Filter':
{
'Key': {
'FilterRules': [
{'Name': 'prefix', 'Value': 'test/'},
{'Name': 'suffix', 'Value': '.txt'}
]
}
}
}

response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
'LoggingEnabled': logging_enabled,
})
assert response['ResponseMetadata']['HTTPStatusCode'] == 200

response = client.get_bucket_logging(Bucket=src_bucket_name)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
if log_type == 'Journal':
assert response['LoggingEnabled'] == logging_enabled
elif log_type == 'Standard':
print('TODO')
else:
assert False, 'unknown log type: %s' % log_type

names = []
num_keys = 5
for j in range(num_keys):
name = 'myobject'+str(j)
if log_type == 'Standard':
# standard log records are not filtered
names.append(name)
client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())

for j in range(num_keys):
name = 'test/'+'myobject'+str(j)+'.txt'
names.append(name)
client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())

expected_count = len(names)

time.sleep(expected_object_roll_time)
client.put_object(Bucket=src_bucket_name, Key='test/dummy.txt', Body='dummy')

response = client.list_objects_v2(Bucket=log_bucket_name)
keys = _get_keys(response)
assert len(keys) == 1

for key in keys:
assert key.startswith('log/')
response = client.get_object(Bucket=log_bucket_name, Key=key)
body = _get_body(response)
assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', names, log_type, expected_count, exact_match=True)


@pytest.mark.bucket_logging
@pytest.mark.fails_on_aws
def test_bucket_logging_key_filter_s():
if not _has_bucket_logging_extension():
pytest.skip('ceph extension to bucket logging not supported at client')
_bucket_logging_key_filter('Standard')


@pytest.mark.bucket_logging
@pytest.mark.fails_on_aws
def test_bucket_logging_key_filter_j():
if not _has_bucket_logging_extension():
pytest.skip('ceph extension to bucket logging not supported at client')
_bucket_logging_key_filter('Journal')


def _bucket_logging_flush(log_type):
src_bucket_name = get_new_bucket_name()
src_bucket = get_new_bucket_resource(name=src_bucket_name)
log_bucket_name = get_new_bucket_name()
log_bucket = get_new_bucket_resource(name=log_bucket_name)
client = get_client()

logging_enabled = {
'TargetBucket': log_bucket_name,
'LoggingType': log_type,
'TargetPrefix': 'log/',
'ObjectRollTime': 300, # 5 minutes
'TargetObjectKeyFormat': {'SimplePrefix': {}},
'RecordsBatchSize': 0,
}

response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={
'LoggingEnabled': logging_enabled,
})
assert response['ResponseMetadata']['HTTPStatusCode'] == 200

response = client.get_bucket_logging(Bucket=src_bucket_name)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200
if log_type == 'Journal':
assert response['LoggingEnabled'] == logging_enabled
elif log_type == 'Standard':
print('TODO')
else:
assert False, 'unknown log type: %s' % log_type

num_keys = 5
for j in range(num_keys):
name = 'myobject'+str(j)
client.put_object(Bucket=src_bucket_name, Key=name, Body=randcontent())

response = client.list_objects_v2(Bucket=src_bucket_name)
src_keys = _get_keys(response)

response = client.post_bucket_logging(Bucket=src_bucket_name)
assert response['ResponseMetadata']['HTTPStatusCode'] == 200

expected_count = num_keys

response = client.list_objects_v2(Bucket=log_bucket_name)
keys = _get_keys(response)
assert len(keys) == 1

for key in keys:
assert key.startswith('log/')
response = client.get_object(Bucket=log_bucket_name, Key=key)
body = _get_body(response)
assert _verify_records(body, src_bucket_name, 'REST.PUT.OBJECT', src_keys, log_type, expected_count)


@pytest.mark.bucket_logging
@pytest.mark.fails_on_aws
def test_bucket_logging_flush_j():
if not _has_bucket_logging_extension():
pytest.skip('ceph extension to bucket logging not supported at client')
_bucket_logging_flush('Journal')


@pytest.mark.bucket_logging
@pytest.mark.fails_on_aws
def test_bucket_logging_flush_s():
if not _has_bucket_logging_extension():
pytest.skip('ceph extension to bucket logging not supported at client')
_bucket_logging_flush('Standard')


@pytest.mark.bucket_logging
def test_put_bucket_logging_errors():
src_bucket_name = get_new_bucket_name()
Expand Down Expand Up @@ -14305,29 +14490,6 @@ def test_put_bucket_logging_extensions():
assert response['LoggingEnabled'] == logging_enabled


import logging

logger = logging.getLogger(__name__)

def _verify_records(records, bucket_name, event_type, src_keys, record_type, expected_count):
keys_found = []
for record in iter(records.splitlines()):
logger.info('bucket log record: %s', _parse_log_record(record, record_type))
if bucket_name in record and event_type in record:
for key in src_keys:
if key in record:
keys_found.append(key)
break
logger.info('keys found in bucket log: %s', str(keys_found))
logger.info('keys from the source bucket: %s', str(src_keys))
return len(keys_found) == expected_count

def randcontent():
letters = string.ascii_lowercase
length = random.randint(10, 1024)
return ''.join(random.choice(letters) for i in range(length))


def _bucket_logging_put_objects(versioned):
src_bucket_name = get_new_bucket()
if versioned:
Expand Down

0 comments on commit ae8bebd

Please sign in to comment.