Skip to content

Prune add file entry using columns stats in Delta checkpoint iterator #25311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getEnforcedPartitionConstraint(),
tableHandle.getNonPartitionConstraint(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we see a test in TestDeltaLakeFileOperations to showcase on the file access level the effectiveness of this contribution ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be hard to observe a noticeable difference in file access, because we already prune AddFile entries using column statistics during split generation, specifically in DeltaLakeSplitManager#getSplits and FileBasedTableStatisticsProvider#getTableStatistics.
The commit is apply that pruning earlier, by filtering out AddFile entries that don’t satisfy the predicates while reading from the checkpoint, what we can see is that the size of add entries comes from is pruned, which shows in the added test

tableHandle.getProjectedColumns().orElse(ImmutableSet.of()))) {
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
Expand Down Expand Up @@ -1713,6 +1714,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
deltaLakeTableHandle.getMetadataEntry(),
deltaLakeTableHandle.getProtocolEntry(),
deltaLakeTableHandle.getEnforcedPartitionConstraint(),
deltaLakeTableHandle.getNonPartitionConstraint(),
deltaLakeTableHandle.getProjectedColumns().orElse(ImmutableSet.of()))) {
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
Expand Down Expand Up @@ -2621,6 +2623,7 @@ private Map<String, DeletionVectorEntry> loadDeletionVectors(ConnectorSession se
handle.getMetadataEntry(),
handle.getProtocolEntry(),
handle.getEnforcedPartitionConstraint(),
handle.getNonPartitionConstraint(),
handle.getProjectedColumns().orElse(ImmutableSet.of()))) {
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
Expand Down Expand Up @@ -4359,6 +4362,7 @@ private Stream<AddFileEntry> getAddFileEntriesMatchingEnforcedPartitionConstrain
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getEnforcedPartitionConstraint(),
tableHandle.getNonPartitionConstraint(),
tableHandle.getProjectedColumns().orElse(ImmutableSet.of()));
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
if (enforcedPartitionConstraint.isAll()) {
Expand Down Expand Up @@ -4480,11 +4484,11 @@ public static DeltaLakeTableHandle checkValidTableHandle(ConnectorTableHandle ta
}

public static TupleDomain<DeltaLakeColumnHandle> createStatisticsPredicate(
AddFileEntry addFileEntry,
Optional<? extends DeltaLakeFileStatistics> stats,
List<DeltaLakeColumnMetadata> schema,
List<String> canonicalPartitionColumns)
{
return addFileEntry.getStats()
return stats
.map(deltaLakeFileStatistics -> withColumnDomains(
schema.stream()
.filter(column -> canUseInPredicate(column.columnMetadata()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ private Stream<DeltaLakeSplit> getSplits(
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getEnforcedPartitionConstraint(),
tableHandle.getNonPartitionConstraint(),
tableHandle.getProjectedColumns().orElse(ImmutableSet.of()));
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint = tableHandle.getNonPartitionConstraint();
Expand Down Expand Up @@ -232,7 +233,7 @@ private Stream<DeltaLakeSplit> getSplits(
}

TupleDomain<DeltaLakeColumnHandle> statisticsPredicate = createStatisticsPredicate(
addAction,
addAction.getStats(),
predicatedColumns,
metadataEntry.getLowercasePartitionColumns());
if (!nonPartitionConstraint.overlaps(statisticsPredicate)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getEnforcedPartitionConstraint(),
tableHandle.getNonPartitionConstraint(),
tableHandle.getProjectedColumns().orElse(ImmutableSet.of()))) {
Iterator<AddFileEntry> addEntryIterator = addEntries.iterator();
while (addEntryIterator.hasNext()) {
Expand Down Expand Up @@ -159,7 +160,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
}

TupleDomain<DeltaLakeColumnHandle> statisticsPredicate = createStatisticsPredicate(
addEntry,
addEntry.getStats(),
predicatedColumns,
tableHandle.getMetadataEntry().getLowercasePartitionColumns());
if (!tableHandle.getNonPartitionConstraint().overlaps(statisticsPredicate)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public AddFileEntry(
this.tags = tags;
this.deletionVector = requireNonNull(deletionVector, "deletionVector is null");

this.parsedStats = getDeltaLakeFileStatistics(stats, parsedStats);
}

public static Optional<? extends DeltaLakeFileStatistics> getDeltaLakeFileStatistics(Optional<String> stats, Optional<DeltaLakeParquetFileStatistics> parsedStats)
{
Optional<? extends DeltaLakeFileStatistics> resultParsedStats = Optional.empty();
if (parsedStats.isPresent()) {
resultParsedStats = parsedStats;
Expand All @@ -111,7 +116,7 @@ else if (stats.isPresent()) {
stats.get());
}
}
this.parsedStats = resultParsedStats;
return resultParsedStats;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
TrinoFileSystem fileSystem,
FileFormatDataSourceStats stats,
Optional<MetadataAndProtocolEntry> metadataAndProtocol,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> constraint,
Optional<Predicate<String>> addStatsMinMaxColumnFilter,
Executor executor)
throws IOException
Expand Down Expand Up @@ -259,7 +259,7 @@ public Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
stats,
checkpoint,
checkpointFile,
partitionConstraint,
constraint,
addStatsMinMaxColumnFilter,
executor));
}
Expand All @@ -280,7 +280,7 @@ private Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
FileFormatDataSourceStats stats,
LastCheckpoint checkpoint,
TrinoInputFile checkpointFile,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> constraint,
Optional<Predicate<String>> addStatsMinMaxColumnFilter,
Executor executor)
{
Expand All @@ -305,7 +305,7 @@ private Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
stats,
checkpoint,
checkpointFile,
partitionConstraint,
constraint,
addStatsMinMaxColumnFilter,
fileSystem,
fileSize,
Expand All @@ -324,7 +324,7 @@ private Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
parquetReaderOptions,
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold,
partitionConstraint,
constraint,
addStatsMinMaxColumnFilter);
return stream(checkpointEntryIterator).onClose(checkpointEntryIterator::close);
}
Expand All @@ -339,7 +339,7 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointTransactionLogEntrie
FileFormatDataSourceStats stats,
LastCheckpoint checkpoint,
TrinoInputFile checkpointFile,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> constraint,
Optional<Predicate<String>> addStatsMinMaxColumnFilter,
TrinoFileSystem fileSystem,
long fileSize,
Expand All @@ -348,7 +348,7 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointTransactionLogEntrie
// Sidecar files contain only ADD and REMOVE entry types. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#v2-spec
Set<CheckpointEntryIterator.EntryType> dataEntryTypes = Sets.intersection(entryTypes, Set.of(ADD, REMOVE));
List<CompletableFuture<Stream<DeltaLakeTransactionLogEntry>>> logEntryStreamFutures =
getV2CheckpointEntries(session, entryTypes, metadataEntry, protocolEntry, checkpointSchemaManager, typeManager, stats, checkpoint, checkpointFile, partitionConstraint, addStatsMinMaxColumnFilter, fileSystem, fileSize)
getV2CheckpointEntries(session, entryTypes, metadataEntry, protocolEntry, checkpointSchemaManager, typeManager, stats, checkpoint, checkpointFile, constraint, addStatsMinMaxColumnFilter, fileSystem, fileSize)
.map(v2checkpointEntry -> {
if (v2checkpointEntry.getSidecar() == null || dataEntryTypes.isEmpty()) {
return CompletableFuture.completedFuture(Stream.of(v2checkpointEntry));
Expand All @@ -369,7 +369,7 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointTransactionLogEntrie
parquetReaderOptions,
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold,
partitionConstraint,
constraint,
addStatsMinMaxColumnFilter);
return stream(iterator).onClose(iterator::close);
}, executor);
Expand All @@ -391,7 +391,7 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointEntries(
FileFormatDataSourceStats stats,
LastCheckpoint checkpoint,
TrinoInputFile checkpointFile,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> constraint,
Optional<Predicate<String>> addStatsMinMaxColumnFilter,
TrinoFileSystem fileSystem,
long fileSize)
Expand Down Expand Up @@ -423,7 +423,7 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointEntries(
parquetReaderOptions,
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold,
partitionConstraint,
constraint,
addStatsMinMaxColumnFilter);
return stream(checkpointEntryIterator)
.onClose(checkpointEntryIterator::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,26 +344,27 @@ public Stream<AddFileEntry> getActiveFiles(
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Set<DeltaLakeColumnHandle> projectedColumns)
{
Set<String> baseColumnNames = projectedColumns.stream()
.filter(DeltaLakeColumnHandle::isBaseColumn) // Only base column stats are supported
.map(DeltaLakeColumnHandle::columnName)
.collect(toImmutableSet());
return getActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, baseColumnNames::contains);
return getActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint.intersect(nonPartitionConstraint), baseColumnNames::contains);
}

public Stream<AddFileEntry> getActiveFiles(
ConnectorSession session,
TableSnapshot tableSnapshot,
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> constraint,
Predicate<String> addStatsMinMaxColumnFilter)
{
try {
if (isCheckpointFilteringEnabled(session)) {
return loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, addStatsMinMaxColumnFilter);
return loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, constraint, addStatsMinMaxColumnFilter);
}

TableVersion tableVersion = new TableVersion(new TableLocation(tableSnapshot.getTable(), tableSnapshot.getTableLocation()), tableSnapshot.getVersion());
Expand Down Expand Up @@ -411,7 +412,7 @@ public Stream<AddFileEntry> loadActiveFiles(
TableSnapshot tableSnapshot,
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> constraint,
Predicate<String> addStatsMinMaxColumnFilter)
{
List<Transaction> transactions = tableSnapshot.getTransactions();
Expand All @@ -424,13 +425,13 @@ public Stream<AddFileEntry> loadActiveFiles(
fileSystem,
fileFormatDataSourceStats,
Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)),
partitionConstraint,
constraint,
Optional.of(addStatsMinMaxColumnFilter),
new BoundedExecutor(executorService, checkpointProcessingParallelism))) {
return activeAddEntries(checkpointEntries, transactions, fileSystem)
.filter(partitionConstraint.isAll()
.filter(constraint.isAll()
? addAction -> true
: addAction -> partitionMatchesPredicate(addAction.getCanonicalPartitionValues(), partitionConstraint.getDomains().orElseThrow()));
: addAction -> partitionMatchesPredicate(addAction.getCanonicalPartitionValues(), constraint.getDomains().orElseThrow()));
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error reading transaction log for " + tableSnapshot.getTable(), e);
Expand Down
Loading