Skip to content

Core: Detect and merge duplicate DVs for a data file and merge them before committing#15006

Open
amogh-jahagirdar wants to merge 28 commits intoapache:mainfrom
amogh-jahagirdar:always-merge-duplicates-on-driver
Open

Core: Detect and merge duplicate DVs for a data file and merge them before committing#15006
amogh-jahagirdar wants to merge 28 commits intoapache:mainfrom
amogh-jahagirdar:always-merge-duplicates-on-driver

Conversation

@amogh-jahagirdar
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar commented Jan 9, 2026

While generally, writers are expected to merge DVs for a given data file before attempting to commit, we probably want to have a safeguard in the commit path in case this assumption is violated. This has been observed when AQE is enabled in Spark and a data file is split across multiple tasks (really just depends on how files and deletes are split); then multiple DVs are produced for a given data file, and then committed. Currently, after that commit reads would fail since the DeleteFileIndex detects the duplicates and fails on read.

Arguably, there should be a safeguard on the commit path which detects duplicates and fixes them up to prevent any invalid table states. Doing this behind the API covers any engine integration using the library.

This change updates MergingSnapshotProducer to track duplicate DVs for a datafile, and then merge them and produces a Puffin file per DV. Note that since we generally expect duplicates to be rare, we don't expect there to be too many small Puffins produced, and we don't add the additional logic to coalesce into larger files. Furthermore, these can later be compacted. In case of large scale duplicates, then engines should arguably fix those up before handing off to the commit path.

@github-actions github-actions bot added the core label Jan 9, 2026
@amogh-jahagirdar
Copy link
Contributor Author

amogh-jahagirdar commented Jan 9, 2026

Still cleaning some stuff up, so leaving in draft but feel free to comment. But basically there are some cases in Spark where a file can be split across multiple tasks, and if deletes happen to touch every single part in the task we'd incorrectly produce multiple DVs for a given data file (discovered this recently with a user when they had Spark AQE enabled, but I think file splitting can happen in more cases).

We currently throw on read in such cases, but ideally we can try and prevent this on write by detecting and merging pre-commit.

The reason this is done behind the API is largely so that we are defensive from a library perspective that in case an engine/integration happens to produce multiple DVs, we can at least fix it up pre-commit.

In the case there are too many to reasonably rewrite on a single node, then engines could do distributed writes to fix up before handing off the files to the API, but arguably from a library perspective it seems reasonable to pay this overhead to prevent bad commits across any integration.

@github-actions github-actions bot added the spark label Jan 9, 2026
addedFilesSummary.addedFile(spec, file);
hasNewDeleteFiles = true;
if (ContentFileUtil.isDV(file)) {
newDVRefs.add(file.referencedDataFile());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably keep a boolean in-case we detect a duplicate. That way we don't have to pay the price of grouping by referenced file everytime to detect possible duplicates; only if we detect it at the time of adding it, we can do the dedupe/merge

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also could just keep a mapping specific for duplicates. That shrinks down how much work we need to do because instead of trying to group by every referenced data file in case of duplicates, we just go through the duplicates set. It's maybe a little more memory but if we consider that we expect duplicates to generally be rare it feels like a generally better solution

@amogh-jahagirdar amogh-jahagirdar force-pushed the always-merge-duplicates-on-driver branch from 374b567 to c04d0e0 Compare January 11, 2026 20:02
DeleteFileSet deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create());
if (deleteFiles.add(file)) {
addedFilesSummary.addedFile(spec, file);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we may be merging duplicates, we don't update the summary for delete files until after we dedupe and are just about to write the new manifests

Pair<List<PositionDelete<?>>, DeleteFile> deletesA =
deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"});
Pair<List<PositionDelete<?>>, DeleteFile> deletesB =
deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"});
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix surfaced an issue in some of the TestPositionDeletesTable tests where we were setting the wrong data file for delete file; we'd just add a DV for the same data file, and then it'd get merged with the new logic , and break some of the later assertions.

// Add Data Files with EQ and POS deletes
DeleteFile fileADeletes = fileADeletes();
DeleteFile fileA2Deletes = fileA2Deletes();
DeleteFile fileBDeletes = fileBDeletes();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test had to be fixed after the recent changes because the file paths for data file B and B2 were set to the same before, so the DVs for both referenced the same file (but that probably wasn't the intention of these tests) so it was a duplicate. After this change we'd merge the DVs in the commit, and then it'd actually get treated as a dangling delete and fail some of the assertions.

Since these tests are just testing the eq. delete case we could just simplify it by removing the usage of fileB deletes, it's a more minimal test that tests the same thing.

Also note, generally I'd take this in a separate PR but I think there's a good argument that this change should be in a 1.10.2 patch release to prevent invalid table states; in that case we'd need to keep these changes together.

@amogh-jahagirdar amogh-jahagirdar marked this pull request as ready for review January 12, 2026 17:15
@amogh-jahagirdar amogh-jahagirdar changed the title Core: Merge DVs referencing the same data files as a safeguard Core: Track duplicate DVs for data file and merge them before committing Jan 12, 2026
@amogh-jahagirdar amogh-jahagirdar force-pushed the always-merge-duplicates-on-driver branch from 78b772c to 6bccc52 Compare January 26, 2026 00:21
if (newSnapshotIds.contains(entry.snapshotId()) && ContentFileUtil.isDV(file)) {
ValidationException.check(
!newDVRefs.contains(file.referencedDataFile()),
!dvsByReferencedFile.containsKey(file.referencedDataFile()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah had an old PR out for this https://github.com/apache/iceberg/pull/11693/files#diff-410ff1b47d9a44a2fd5dbd103cad9463d82c8f4f51aa1be63b8b403123ab6e0e (probably a bad PR title since by definition for the operation if the positions are disjoint, it's not conflicting)

Comment on lines +92 to +93
private final List<DeleteFile> positionAndEqualityDeletes = Lists.newArrayList();
private final Map<String, List<DeleteFile>> dvsByReferencedFile = Maps.newLinkedHashMap();
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue These are 2 disjoint fields, one for a list of v2 deletes and a multimap for DVs.
The map is a LinkedHashMap because we have a bunch of tests which have expectations on the exact orders of entries in a manifest. The previous change didn't require anything because we worked with the deleteFilesBySpec, and inherently preserved the order.

I personally think our tests should probably get away from expecting a certain order in manifests, and just assert the contents (or at least have validate methods that express either being strict on the ordering or not). As we get into V4, maybe we'll make implementation choices for ordering entries in a certain way but in the current state of things, it was kind of a hinderance to making changes here.

I didn't make the test change since it's fairly large, and can be distracting from this change and I figured the linkedhashma has negligible overhead so we can just preserve the existing behavior.

Map<Integer, List<DeleteFile>> newDeleteFilesBySpec =
Streams.stream(
Iterables.concat(
mergedDVs, validDVs, DeleteFileSet.of(positionAndEqualityDeletes)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue let me know how you feel about the DeleteFileSet.of(positionandEqualityDeletes).
I know we were kind of against de-duping but I think the fact that the two fields are disjoint now avoids that partition spec case you mentioned. I'm a bit worried that not deduping before producing the manifests is a regression compared to the previous behavior. And there's a good argument that if we can do it correctly, relatively cheaply, it's better to do it to avoid any bad metadata (similar to why we do it for data files).

The summary stats are anyways produced from this "final" deleteFilesBySpec which should be all correct so I think we're covered in general.

@amogh-jahagirdar amogh-jahagirdar force-pushed the always-merge-duplicates-on-driver branch from 8bdfd4f to dc87d14 Compare January 27, 2026 16:40
@amogh-jahagirdar amogh-jahagirdar force-pushed the always-merge-duplicates-on-driver branch from dc87d14 to 669f125 Compare January 27, 2026 16:42
@kevinjqliu kevinjqliu added the bug Something isn't working label Jan 28, 2026
@amogh-jahagirdar amogh-jahagirdar changed the title Core: Track duplicate DVs for data file and merge them before committing Core: Detect and merge duplicate DVs for a data file and merge them before committing Jan 29, 2026
// Use an unpartitioned spec for the location provider for the puffin containing
// all the merged DVs
OutputFileFactory.builderFor(
ops, PartitionSpec.unpartitioned(), FileFormat.PUFFIN, 1, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep the partition values (instead of always as an unpartitioned)? I know it would write multiple Puffin files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a problem. Puffin files can store DVs from any partition in the table and the partitions are tracked by DeleteFile in manifests. This isn't discarding partition information (which is still used to filter) but is instead controlling the placement of Puffin files.

// Merges the position indices for the duplicate DVs for a given referenced file
private static PositionDeleteIndex readDVsAndMerge(
TableOperations ops, List<DeleteFile> dvsForFile, ExecutorService pool) {
Preconditions.checkArgument(dvsForFile.size() > 1, "Expected more than 1 DV");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this needs to be an error case. If there is only one DV, then shouldn't this just return it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do feel like the caller is making a mistake if they call this with a single DV, but Ryan may be right that it doesn't have to be a runtime error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'll see how I can structure this better. I was also mostly thinking that calling this utility when there's 1 is a mistake. Also the way this helper is used is after the fact we presume we are going to write out a new puffin. If there's already 1 DV we really don't want to write that (since it's work that's already been done).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the main entrypoint to be mergeDvsIfRequired. The previous implementation presumed you'd only call it to merge duplicates but a nicer utility would be a general "just take these DVs and fix it up if they need fixing". So it'll merge the duplicates and write out a new puffin for the DVs that need it, and for everything else they're left as is and returned in the result list.

PositionDeleteIndex[] dvIndices = new PositionDeleteIndex[dvs.size()];
Tasks.range(dvIndices.length)
.executeWith(pool)
.stopOnFailure()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is in the commit path, do we want to retry?

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should? This is all file I/O and I think I'd expect FileIOs under the hood to already be retrying. I'm not sure we should really compound the retries or if there's any benefit to doing that past what FileIO's are already doing, especially at this point where we're doing extra work in the commit path already.

.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> readDVsAndMerge(ops, entry.getValue(), threadpool)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling readDVsAndMerge sequentially for each group of DVs limits parallelism because each group must finish before reading the next group. Was this on purpose? I'd probably read DVs in a huge batch and then group and merge them.

It looks like this may be because the signature matches the structure used in MergingSnapshotProducer, in which case I recommend considering what a good method signature would be that works for this case and is flexible for other uses in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes largely because we've already computed the grouping and that we don't expect too many duplicates to begin with (so parallelism being limited didn't seem like too big of a deal unless there's a large amount of deletes), but you're right that parallelism is limited compared to just reading all the DVs across all threads and later regrouping them.

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in either case, even if there are few or many duplicates, we'd expect the bottleneck to just be the I/O. Merging/grouping probably is the least of the concern there (all assuming we'd rather just take the hit on memory completely).

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this to read all the DVs that need to be merged in parallel rather than going group by group across the duplicates.

As far as method signature goes, I thought about it some more and I still think that at the moment the right thing to pass is the map of referenced file to the list of DVs, so I'd prefer to leave it as is. The reason is before reading all the DVs we first need to figure out which DVs are duplicate, which neccessitates grouping by referenced file. Since MergingSnapshotProducer already tracks it by referenced file that algins pretty well.

Another option is down the line we can also pass in just a List and defer the grouping to the utility but I don't think that's needed at the moment because we'd just have to regroup things (needlessly) again; we could add that when it's needed since it's package private. I could see this API being useful because we probably don't want to force a caller to have to compute the grouping.

@amogh-jahagirdar amogh-jahagirdar force-pushed the always-merge-duplicates-on-driver branch 4 times, most recently from e14d18c to 1afc328 Compare February 5, 2026 15:38
@amogh-jahagirdar amogh-jahagirdar force-pushed the always-merge-duplicates-on-driver branch 3 times, most recently from bc231a2 to edecb48 Compare February 5, 2026 16:59
@amogh-jahagirdar amogh-jahagirdar force-pushed the always-merge-duplicates-on-driver branch from edecb48 to 8087558 Compare February 5, 2026 17:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working core data spark

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants