Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable Parquet Data Anonymization feature #24559

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.AccessControlException;
import org.apache.parquet.anonymization.AnonymizationManager;
import org.apache.parquet.anonymization.AnonymizationManagerFactory;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.crypto.InternalFileDecryptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand Down Expand Up @@ -86,8 +88,10 @@
import static com.facebook.presto.delta.DeltaErrorCode.DELTA_PARQUET_SCHEMA_MISMATCH;
import static com.facebook.presto.delta.DeltaTypeUtils.convertPartitionValue;
import static com.facebook.presto.hive.CacheQuota.NO_CACHE_CONSTRAINTS;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getParquetAnonymizationManagerClass;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getParquetMaxReadBlockSize;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getReadNullMaskedParquetEncryptedValue;
import static com.facebook.presto.hive.HiveCommonSessionProperties.isParquetAnonymizationEnabled;
import static com.facebook.presto.hive.HiveCommonSessionProperties.isParquetBatchReaderVerificationEnabled;
import static com.facebook.presto.hive.HiveCommonSessionProperties.isParquetBatchReadsEnabled;
import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource;
Expand Down Expand Up @@ -280,6 +284,14 @@ private static ConnectorPageSource createParquetPageSource(
}
}
MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
Optional<AnonymizationManager> anonymizationManager = Optional.empty();
if (isParquetAnonymizationEnabled(session)) {
anonymizationManager = AnonymizationManagerFactory
.getAnonymizationManager(
getParquetAnonymizationManagerClass(session),
configuration,
tableName.toString());
}
ParquetReader parquetReader = new ParquetReader(
messageColumnIO,
blocks.build(),
Expand All @@ -292,7 +304,8 @@ private static ConnectorPageSource createParquetPageSource(
parquetPredicate,
blockIndexStores,
false,
fileDecryptor);
fileDecryptor,
anonymizationManager);

ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class HiveCommonClientConfig
private boolean readNullMaskedParquetEncryptedValueEnabled;
private boolean useParquetColumnNames;
private boolean zstdJniDecompressionEnabled;
private boolean isParquetAnonymizationEnabled;
private String parquetAnonymizationManagerClass;

public NodeSelectionStrategy getNodeSelectionStrategy()
{
Expand Down Expand Up @@ -284,4 +286,30 @@ public HiveCommonClientConfig setZstdJniDecompressionEnabled(boolean zstdJniDeco
this.zstdJniDecompressionEnabled = zstdJniDecompressionEnabled;
return this;
}

@Config("hive.enable-parquet-anonymization")
@ConfigDescription("enable parquet anonymization")
public HiveCommonClientConfig setParquetAnonymizationEnabled(boolean isParquetAnonymizationEnabled)
{
this.isParquetAnonymizationEnabled = isParquetAnonymizationEnabled;
return this;
}

public boolean isParquetAnonymizationEnabled()
{
return this.isParquetAnonymizationEnabled;
}

@Config("hive.parquet-anonymization-manager-class")
@ConfigDescription("Anonymization manager class, must be set if parquet anonymization is enabled")
public HiveCommonClientConfig setParquetAnonymizationManagerClass(String parquetAnonymizationManagerClass)
{
this.parquetAnonymizationManagerClass = parquetAnonymizationManagerClass;
return this;
}

public String getParquetAnonymizationManagerClass()
{
return this.parquetAnonymizationManagerClass;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class HiveCommonSessionProperties
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_USE_COLUMN_NAMES = "parquet_use_column_names";
public static final String READ_MASKED_VALUE_ENABLED = "read_null_masked_parquet_encrypted_value_enabled";
public static final String ENABLE_PARQUET_ANONYMIZATION = "enable_parquet_anonymization";
public static final String PARQUET_ANONYMIZATION_MANAGER_CLASS = "parquet_anonymization_manager_class";
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand Down Expand Up @@ -177,6 +179,16 @@ public HiveCommonSessionProperties(HiveCommonClientConfig hiveCommonClientConfig
READ_MASKED_VALUE_ENABLED,
"Return null when access is denied for an encrypted parquet column",
hiveCommonClientConfig.getReadNullMaskedParquetEncryptedValue(),
false),
booleanProperty(
ENABLE_PARQUET_ANONYMIZATION,
"Is parquet anonymization enabled",
hiveCommonClientConfig.isParquetAnonymizationEnabled(),
false),
stringProperty(
PARQUET_ANONYMIZATION_MANAGER_CLASS,
"Parquet anonymization manager class",
hiveCommonClientConfig.getParquetAnonymizationManagerClass(),
false));
}

Expand Down Expand Up @@ -287,6 +299,16 @@ public static boolean getReadNullMaskedParquetEncryptedValue(ConnectorSession se
return session.getProperty(READ_MASKED_VALUE_ENABLED, Boolean.class);
}

public static boolean isParquetAnonymizationEnabled(ConnectorSession session)
{
return session.getProperty(ENABLE_PARQUET_ANONYMIZATION, Boolean.class);
}

public static String getParquetAnonymizationManagerClass(ConnectorSession session)
{
return session.getProperty(PARQUET_ANONYMIZATION_MANAGER_CLASS, String.class);
}

public static PropertyMetadata<DataSize> dataSizeSessionProperty(String name, String description, DataSize defaultValue, boolean hidden)
{
return new PropertyMetadata<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public void testDefaults()
.setZstdJniDecompressionEnabled(false)
.setParquetBatchReaderVerificationEnabled(false)
.setParquetBatchReadOptimizationEnabled(false)
.setReadNullMaskedParquetEncryptedValue(false));
.setReadNullMaskedParquetEncryptedValue(false)
.setParquetAnonymizationEnabled(false)
.setParquetAnonymizationManagerClass(null));
}

@Test
Expand All @@ -72,6 +74,9 @@ public void testExplicitPropertyMappings()
.put("hive.enable-parquet-batch-reader-verification", "true")
.put("hive.parquet-batch-read-optimization-enabled", "true")
.put("hive.read-null-masked-parquet-encrypted-value-enabled", "true")
.put("hive.enable-parquet-anonymization", "true")
.put("hive.parquet-anonymization-manager-class",
"org.apache.parquet.anonymization.TestAnonymizationManager")
.build();

HiveCommonClientConfig expected = new HiveCommonClientConfig()
Expand All @@ -92,7 +97,9 @@ public void testExplicitPropertyMappings()
.setZstdJniDecompressionEnabled(true)
.setParquetBatchReaderVerificationEnabled(true)
.setParquetBatchReadOptimizationEnabled(true)
.setReadNullMaskedParquetEncryptedValue(true);
.setReadNullMaskedParquetEncryptedValue(true)
.setParquetAnonymizationEnabled(true)
.setParquetAnonymizationManagerClass("org.apache.parquet.anonymization.TestAnonymizationManager");

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.anonymization.AnonymizationManager;
import org.apache.parquet.anonymization.AnonymizationManagerFactory;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.crypto.DecryptionPropertiesFactory;
import org.apache.parquet.crypto.FileDecryptionProperties;
Expand Down Expand Up @@ -96,8 +98,10 @@
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.hive.HiveColumnHandle.getPushedDownSubfield;
import static com.facebook.presto.hive.HiveColumnHandle.isPushedDownSubfield;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getParquetAnonymizationManagerClass;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getParquetMaxReadBlockSize;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getReadNullMaskedParquetEncryptedValue;
import static com.facebook.presto.hive.HiveCommonSessionProperties.isParquetAnonymizationEnabled;
import static com.facebook.presto.hive.HiveCommonSessionProperties.isParquetBatchReaderVerificationEnabled;
import static com.facebook.presto.hive.HiveCommonSessionProperties.isParquetBatchReadsEnabled;
import static com.facebook.presto.hive.HiveCommonSessionProperties.isUseParquetColumnNames;
Expand Down Expand Up @@ -254,6 +258,14 @@ public static ConnectorPageSource createParquetPageSource(
nextStart += block.getRowCount();
}
MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
Optional<AnonymizationManager> anonymizationManager = Optional.empty();
if (isParquetAnonymizationEnabled(session)) {
anonymizationManager = AnonymizationManagerFactory
.getAnonymizationManager(
getParquetAnonymizationManagerClass(session),
configuration,
tableName.toString());
}
ParquetReader parquetReader = new ParquetReader(
messageColumnIO,
blocks.build(),
Expand All @@ -266,7 +278,8 @@ public static ConnectorPageSource createParquetPageSource(
parquetPredicate,
blockIndexStores,
columnIndexFilterEnabled,
fileDecryptor);
fileDecryptor,
anonymizationManager);

ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ ParquetPageSource createParquetPageSource()
fields.add(ColumnIOConverter.constructField(getTypeFromTypeSignature(), messageColumnIO.getChild(i)));
}

ParquetReader parquetReader = new ParquetReader(messageColumnIO, parquetMetadata.getBlocks(), Optional.empty(), dataSource, newSimpleAggregatedMemoryContext(), new DataSize(16, MEGABYTE), batchReadEnabled, enableVerification, null, null, false, Optional.empty());
ParquetReader parquetReader = new ParquetReader(messageColumnIO, parquetMetadata.getBlocks(), Optional.empty(), dataSource, newSimpleAggregatedMemoryContext(), new DataSize(16, MEGABYTE), batchReadEnabled, enableVerification, null, null, false, Optional.empty(), Optional.empty());
return new ParquetPageSource(parquetReader, Collections.nCopies(channelCount, type), fields, columnNames, new RuntimeStats());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ private static void assertPageSource(List<Type> types, Iterator<?>[] valuesByFie
assertPageSource(types, valuesByField, pageSource, Optional.empty());
}

private static void assertPageSource(List<Type> types, Iterator<?>[] valuesByField, ConnectorPageSource pageSource, Optional<Long> maxReadBlockSize)
static void assertPageSource(List<Type> types, Iterator<?>[] valuesByField, ConnectorPageSource pageSource, Optional<Long> maxReadBlockSize)
{
Page page;
while ((page = pageSource.getNextPage()) != null) {
Expand All @@ -550,7 +550,7 @@ private static void assertPageSource(List<Type> types, Iterator<?>[] valuesByFie
}
}

private static void assertRecordCursor(List<Type> types, Iterator<?>[] valuesByField, RecordCursor cursor)
static void assertRecordCursor(List<Type> types, Iterator<?>[] valuesByField, RecordCursor cursor)
{
while (cursor.advanceNextPosition()) {
for (int field = 0; field < types.size(); field++) {
Expand Down
Loading
Loading