Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postpone loading of the block in all field extractors #25306

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ private DeltaLakeTransactionLogEntry buildProtocolEntry(ConnectorSession session
if (block.isNull(pagePosition)) {
return null;
}

block = block.getLoadedBlock();
RowType type = protocolType.orElseThrow();
int minProtocolFields = 2;
int maxProtocolFields = 4;
Expand Down Expand Up @@ -416,6 +418,8 @@ private DeltaLakeTransactionLogEntry buildMetadataEntry(ConnectorSession session
if (block.isNull(pagePosition)) {
return null;
}

block = block.getLoadedBlock();
RowType type = metadataType.orElseThrow();
int metadataFields = 8;
int formatFields = 2;
Expand Down Expand Up @@ -455,6 +459,8 @@ private DeltaLakeTransactionLogEntry buildRemoveEntry(ConnectorSession session,
if (block.isNull(pagePosition)) {
return null;
}

block = block.getLoadedBlock();
RowType type = removeType.orElseThrow();
int removeFields = 4;
SqlRow removeEntryRow = getRow(block, pagePosition);
Expand Down Expand Up @@ -485,6 +491,8 @@ private DeltaLakeTransactionLogEntry buildSidecarEntry(ConnectorSession session,
if (block.isNull(pagePosition)) {
return null;
}

block = block.getLoadedBlock();
int sidecarFields = 4;
SqlRow sidecarEntryRow = getRow(block, pagePosition);
if (sidecarEntryRow.getFieldCount() != sidecarFields) {
Expand Down Expand Up @@ -669,6 +677,8 @@ private DeltaLakeTransactionLogEntry buildTxnEntry(ConnectorSession session, int
if (block.isNull(pagePosition)) {
return null;
}

block = block.getLoadedBlock();
RowType type = txnType.orElseThrow();
int txnFields = 3;
SqlRow txnEntryRow = getRow(block, pagePosition);
Expand Down Expand Up @@ -758,14 +768,8 @@ private void fillNextEntries()
// process page
int blockIndex = 0;
for (CheckpointFieldExtractor extractor : extractors) {
DeltaLakeTransactionLogEntry entry;
if (extractor instanceof AddFileEntryExtractor) {
// Avoid unnecessary loading of the block in case there is a partition predicate mismatch for this add entry
entry = extractor.getEntry(session, pagePosition, page.getBlock(blockIndex));
}
else {
entry = extractor.getEntry(session, pagePosition, page.getBlock(blockIndex).getLoadedBlock());
}
// Avoid unnecessary loading of the block
DeltaLakeTransactionLogEntry entry = extractor.getEntry(session, pagePosition, page.getBlock(blockIndex));
if (entry != null) {
nextEntries.add(entry);
}
Expand Down