From 10809dfad63a899d9679f1fad269a7a2ddedcb61 Mon Sep 17 00:00:00 2001 From: Super User Date: Sun, 16 Jun 2024 15:41:20 +0000 Subject: [PATCH] a sanity test for generating Parquet object using pandas/pyarrow libraries Signed-off-by: Super User --- requirements.txt | 3 + s3tests_boto3/functional/test_s3select.py | 67 ++++++++++++++++++++++- 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 7742d8fb9..68c04a7d7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,6 @@ httplib2 lxml pytest tox +pandas +pyarrow + diff --git a/s3tests_boto3/functional/test_s3select.py b/s3tests_boto3/functional/test_s3select.py index 1ce4fa3ea..65c38090d 100644 --- a/s3tests_boto3/functional/test_s3select.py +++ b/s3tests_boto3/functional/test_s3select.py @@ -4,7 +4,9 @@ import re import json from botocore.exceptions import ClientError - +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq import uuid from . import ( @@ -107,6 +109,48 @@ def test_generate_projection(): for _ in range(100): generate_s3select_expression_projection(bucket_name,obj_name) +def create_parquet_object(parquet_size): + # Initialize lists with random integers + a = [random.randint(1, 10000) for _ in range(parquet_size)] + b = [random.randint(1, 10000) for _ in range(parquet_size)] + c = [random.randint(1, 10000) for _ in range(parquet_size)] + d = [random.randint(1, 10000) for _ in range(parquet_size)] + + # Create DataFrame + df3 = pd.DataFrame({'a': a, 'b': b, 'c': c, 'd': d}) + + # Create Parquet object + table = pa.Table.from_pandas(df3, preserve_index=False) + obj = pa.BufferOutputStream() + pq.write_table(table, obj) + + return obj.getvalue().to_pybytes() + +def upload_parquet_object(bucket_name,parquet_obj_name,obj): + + client = get_client() + client.create_bucket(Bucket=bucket_name) + client.put_object(Bucket=bucket_name, Key=parquet_obj_name, Body=obj) + +@pytest.mark.s3select +def test_parquet_lowerupper_expressions(): + + parquet_obj = create_parquet_object(1) + + parquet_obj_name = "4col.parquet" + bucket_name = get_new_bucket_name() + + upload_parquet_object(bucket_name,parquet_obj_name,parquet_obj) + + res_s3select = run_s3select_parquet(bucket_name,parquet_obj_name,'select lower("AB12cd$$") from s3object ;') + + s3select_assert_result( res_s3select, 'ab12cd$$\n') + + res_s3select = run_s3select_parquet(bucket_name,parquet_obj_name,'select upper("ab12CD$$") from s3object ;') + + s3select_assert_result( res_s3select, 'AB12CD$$\n') + + def s3select_assert_result(a,b): if type(a) == str: a_strip = a.strip() @@ -338,6 +382,27 @@ def run_s3select_output(bucket,key,query, quot_field, op_column_delim = ",", op_ return result +def run_s3select_parquet(bucket,key,query, op_row_delim = "\n"): + + s3 = get_client() + + r = s3.select_object_content( + Bucket=bucket, + Key=key, + ExpressionType='SQL', + InputSerialization = {'Parquet': {}}, + OutputSerialization = {"CSV": {}}, + Expression=query,) + #Record delimiter optional in output serialization + + result = "" + for event in r['Payload']: + if 'Records' in event: + records = event['Records']['Payload'].decode('utf-8') + result += records + + return result + def run_s3select_json(bucket,key,query, op_row_delim = "\n"): s3 = get_client()