Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Prune add file entry using columns stats in Delta checkpoint iterator #25311

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getEnforcedPartitionConstraint(),
tableHandle.getNonPartitionConstraint(),
tableHandle.getProjectedColumns().orElse(ImmutableSet.of()))) {
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
Expand Down Expand Up @@ -1640,6 +1641,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
deltaLakeTableHandle.getMetadataEntry(),
deltaLakeTableHandle.getProtocolEntry(),
deltaLakeTableHandle.getEnforcedPartitionConstraint(),
deltaLakeTableHandle.getNonPartitionConstraint(),
deltaLakeTableHandle.getProjectedColumns().orElse(ImmutableSet.of()))) {
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
Expand Down Expand Up @@ -2548,6 +2550,7 @@ private Map<String, DeletionVectorEntry> loadDeletionVectors(ConnectorSession se
handle.getMetadataEntry(),
handle.getProtocolEntry(),
handle.getEnforcedPartitionConstraint(),
handle.getNonPartitionConstraint(),
handle.getProjectedColumns().orElse(ImmutableSet.of()))) {
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
Expand Down Expand Up @@ -4286,6 +4289,7 @@ private Stream<AddFileEntry> getAddFileEntriesMatchingEnforcedPartitionConstrain
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getEnforcedPartitionConstraint(),
tableHandle.getNonPartitionConstraint(),
tableHandle.getProjectedColumns().orElse(ImmutableSet.of()));
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
if (enforcedPartitionConstraint.isAll()) {
Expand Down Expand Up @@ -4407,11 +4411,11 @@ public static DeltaLakeTableHandle checkValidTableHandle(ConnectorTableHandle ta
}

public static TupleDomain<DeltaLakeColumnHandle> createStatisticsPredicate(
AddFileEntry addFileEntry,
Optional<? extends DeltaLakeFileStatistics> stats,
List<DeltaLakeColumnMetadata> schema,
List<String> canonicalPartitionColumns)
{
return addFileEntry.getStats()
return stats
.map(deltaLakeFileStatistics -> withColumnDomains(
schema.stream()
.filter(column -> canUseInPredicate(column.columnMetadata()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private List<Page> buildPages(ConnectorSession session)
PageListBuilder pageListBuilder = PageListBuilder.forTable(tableMetadata);

Map<Map<String, Optional<String>>, DeltaLakePartitionStatistics> statisticsByPartition;
try (Stream<AddFileEntry> activeFiles = transactionLogAccess.loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), alwaysTrue())) {
try (Stream<AddFileEntry> activeFiles = transactionLogAccess.loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), TupleDomain.all(), alwaysTrue())) {
statisticsByPartition = getStatisticsByPartition(activeFiles);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ private Stream<DeltaLakeSplit> getSplits(
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getEnforcedPartitionConstraint(),
tableHandle.getNonPartitionConstraint(),
tableHandle.getProjectedColumns().orElse(ImmutableSet.of()));
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint = tableHandle.getNonPartitionConstraint();
Expand Down Expand Up @@ -232,7 +233,7 @@ private Stream<DeltaLakeSplit> getSplits(
}

TupleDomain<DeltaLakeColumnHandle> statisticsPredicate = createStatisticsPredicate(
addAction,
addAction.getStats(),
predicatedColumns,
metadataEntry.getLowercasePartitionColumns());
if (!nonPartitionConstraint.overlaps(statisticsPredicate)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -92,7 +93,8 @@ public DeltaLakeSplitSource(
.exceptionally(throwable -> {
// set trinoException before finishing the queue to ensure failure is observed instead of successful completion
// (the field is declared as volatile to make sure that the change is visible right away to other threads)
trinoException = new TrinoException(findErrorCode(throwable), "Failed to generate splits for " + this.tableName, throwable);
ImmutableList<StackTraceElement> collect = Arrays.stream(throwable.getStackTrace()).collect(toImmutableList());
trinoException = new TrinoException(findErrorCode(throwable), "Failed to generate splits for " + this.tableName + ", msg: " + throwable.getMessage() + ", stack trace:" + collect, throwable);
try {
// Finish the queue to wake up threads from queue.getBatchAsync()
queue.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getEnforcedPartitionConstraint(),
tableHandle.getNonPartitionConstraint(),
tableHandle.getProjectedColumns().orElse(ImmutableSet.of()))) {
Iterator<AddFileEntry> addEntryIterator = addEntries.iterator();
while (addEntryIterator.hasNext()) {
Expand Down Expand Up @@ -159,7 +160,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
}

TupleDomain<DeltaLakeColumnHandle> statisticsPredicate = createStatisticsPredicate(
addEntry,
addEntry.getStats(),
predicatedColumns,
tableHandle.getMetadataEntry().getLowercasePartitionColumns());
if (!tableHandle.getNonPartitionConstraint().overlaps(statisticsPredicate)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public AddFileEntry(
this.tags = tags;
this.deletionVector = requireNonNull(deletionVector, "deletionVector is null");

this.parsedStats = getDeltaLakeFileStatistics(stats, parsedStats);
}

public static Optional<? extends DeltaLakeFileStatistics> getDeltaLakeFileStatistics(Optional<String> stats, Optional<DeltaLakeParquetFileStatistics> parsedStats)
{
Optional<? extends DeltaLakeFileStatistics> resultParsedStats = Optional.empty();
if (parsedStats.isPresent()) {
resultParsedStats = parsedStats;
Expand All @@ -111,7 +116,7 @@ else if (stats.isPresent()) {
stats.get());
}
}
this.parsedStats = resultParsedStats;
return resultParsedStats;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ public Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
FileFormatDataSourceStats stats,
Optional<MetadataAndProtocolEntry> metadataAndProtocol,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<Predicate<String>> addStatsMinMaxColumnFilter)
throws IOException
{
Expand Down Expand Up @@ -254,6 +255,7 @@ public Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
checkpoint,
checkpointFile,
partitionConstraint,
nonPartitionConstraint,
addStatsMinMaxColumnFilter));
}

Expand All @@ -274,6 +276,7 @@ private Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
LastCheckpoint checkpoint,
TrinoInputFile checkpointFile,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<Predicate<String>> addStatsMinMaxColumnFilter)
{
long fileSize;
Expand All @@ -298,6 +301,7 @@ private Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
checkpoint,
checkpointFile,
partitionConstraint,
nonPartitionConstraint,
addStatsMinMaxColumnFilter,
fileSystem,
fileSize);
Expand All @@ -316,6 +320,7 @@ private Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold,
partitionConstraint,
nonPartitionConstraint,
addStatsMinMaxColumnFilter);
return stream(checkpointEntryIterator).onClose(checkpointEntryIterator::close);
}
Expand All @@ -331,11 +336,26 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointTransactionLogEntrie
LastCheckpoint checkpoint,
TrinoInputFile checkpointFile,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<Predicate<String>> addStatsMinMaxColumnFilter,
TrinoFileSystem fileSystem,
long fileSize)
{
return getV2CheckpointEntries(session, entryTypes, metadataEntry, protocolEntry, checkpointSchemaManager, typeManager, stats, checkpoint, checkpointFile, partitionConstraint, addStatsMinMaxColumnFilter, fileSystem, fileSize)
return getV2CheckpointEntries(
session,
entryTypes,
metadataEntry,
protocolEntry,
checkpointSchemaManager,
typeManager,
stats,
checkpoint,
checkpointFile,
partitionConstraint,
nonPartitionConstraint,
addStatsMinMaxColumnFilter,
fileSystem,
fileSize)
.mapMulti((entry, builder) -> {
// Sidecar files contain only ADD and REMOVE entry types. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#v2-spec
Set<CheckpointEntryIterator.EntryType> dataEntryTypes = Sets.intersection(entryTypes, Set.of(ADD, REMOVE));
Expand All @@ -358,6 +378,7 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointTransactionLogEntrie
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold,
partitionConstraint,
nonPartitionConstraint,
addStatsMinMaxColumnFilter);
stream(iterator).onClose(iterator::close).forEach(builder);
});
Expand All @@ -374,6 +395,7 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointEntries(
LastCheckpoint checkpoint,
TrinoInputFile checkpointFile,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<Predicate<String>> addStatsMinMaxColumnFilter,
TrinoFileSystem fileSystem,
long fileSize)
Expand Down Expand Up @@ -406,6 +428,7 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointEntries(
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold,
partitionConstraint,
nonPartitionConstraint,
addStatsMinMaxColumnFilter);
return stream(checkpointEntryIterator)
.onClose(checkpointEntryIterator::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,14 @@ public Stream<AddFileEntry> getActiveFiles(
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Set<DeltaLakeColumnHandle> projectedColumns)
{
Set<String> baseColumnNames = projectedColumns.stream()
.filter(DeltaLakeColumnHandle::isBaseColumn) // Only base column stats are supported
.map(DeltaLakeColumnHandle::columnName)
.collect(toImmutableSet());
return getActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, baseColumnNames::contains);
return getActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, nonPartitionConstraint, baseColumnNames::contains);
}

public Stream<AddFileEntry> getActiveFiles(
Expand All @@ -349,10 +350,22 @@ public Stream<AddFileEntry> getActiveFiles(
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
Predicate<String> addStatsMinMaxColumnFilter)
{
return getActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, TupleDomain.all(), addStatsMinMaxColumnFilter);
}

public Stream<AddFileEntry> getActiveFiles(
ConnectorSession session,
TableSnapshot tableSnapshot,
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Predicate<String> addStatsMinMaxColumnFilter)
{
try {
if (isCheckpointFilteringEnabled(session)) {
return loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, addStatsMinMaxColumnFilter);
return loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, nonPartitionConstraint, addStatsMinMaxColumnFilter);
}

TableVersion tableVersion = new TableVersion(new TableLocation(tableSnapshot.getTable(), tableSnapshot.getTableLocation()), tableSnapshot.getVersion());
Expand Down Expand Up @@ -383,7 +396,7 @@ public Stream<AddFileEntry> getActiveFiles(
}

List<AddFileEntry> activeFiles;
try (Stream<AddFileEntry> addFileEntryStream = loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), alwaysTrue())) {
try (Stream<AddFileEntry> addFileEntryStream = loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), TupleDomain.all(), alwaysTrue())) {
activeFiles = addFileEntryStream.collect(toImmutableList());
}
return new DeltaLakeDataFileCacheEntry(tableSnapshot.getVersion(), activeFiles);
Expand All @@ -401,6 +414,7 @@ public Stream<AddFileEntry> loadActiveFiles(
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Predicate<String> addStatsMinMaxColumnFilter)
{
List<Transaction> transactions = tableSnapshot.getTransactions();
Expand All @@ -414,6 +428,7 @@ public Stream<AddFileEntry> loadActiveFiles(
fileFormatDataSourceStats,
Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)),
partitionConstraint,
nonPartitionConstraint,
Optional.of(addStatsMinMaxColumnFilter))) {
return activeAddEntries(checkpointEntries, transactions, fileSystem)
.filter(partitionConstraint.isAll()
Expand Down Expand Up @@ -567,7 +582,7 @@ private <T> Stream<T> getEntries(
List<Transaction> transactions = tableSnapshot.getTransactions();
// Passing TupleDomain.all() because this method is used for getting all entries
Stream<DeltaLakeTransactionLogEntry> checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries(
session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats, Optional.empty(), TupleDomain.all(), Optional.of(alwaysTrue()));
session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats, Optional.empty(), TupleDomain.all(), TupleDomain.all(), Optional.of(alwaysTrue()));

return entryMapper.apply(
checkpointEntries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,13 @@ public static Object deserializeColumnValue(DeltaLakeColumnHandle column, String
catch (RuntimeException e) {
throw new TrinoException(
GENERIC_INTERNAL_ERROR,
format("Unable to parse value [%s] from column %s with type %s", valueString, column.baseColumnName(), column.baseType()),
format("Unable to parse value [%s] from column %s with type %s, Parse exception", valueString, column.baseColumnName(), column.baseType()),
e);
}
// Anything else is not a supported DeltaLake column
throw new TrinoException(
GENERIC_INTERNAL_ERROR,
format("Unable to parse value [%s] from column %s with type %s", valueString, column.baseColumnName(), column.baseType()));
format("Unable to parse value [%s] from column %s with type %s, Type missing", valueString, column.baseColumnName(), column.baseType()));
}

static Optional<LastCheckpoint> readLastCheckpoint(TrinoFileSystem fileSystem, String tableLocation)
Expand Down
Loading
Loading