feat(spark): Make Hudi's analyzed plans introspectable to lineage tooling#18726
feat(spark): Make Hudi's analyzed plans introspectable to lineage tooling#18726codope wants to merge 2 commits into
Conversation
…ling Signed-off-by: codope <sagarsumit09@gmail.com>
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! The PR makes Hudi MERGE plans and path-based incremental/CDC reads introspectable to lineage tooling — exposing mergeInto via innerChildren while preserving the Catalyst leaf contract, and stamping a synthesized CatalogTable onto incremental/CDC LogicalRelations. The big concern from this pass is a Spark 4 compile compatibility issue in HoodieIncrementalRelationIdentifier — please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here. One small naming nit below; the rest of the code is clean and well-documented.
…mentalRelationIdentifier Signed-off-by: codope <sagarsumit09@gmail.com>
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the iteration! The Spark-version-agnostic type pattern in HoodieIncrementalRelationIdentifier and the innerChildren addition for MergeIntoHoodieTableCommand both look reasonable on this pass. I traced the rule's interaction with downstream catalogTable.isDefined callers and Spark's partition-tracking rules and didn't find regressions. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One naming nit below — otherwise the new rule and tests are clear and well-commented.
cc @yihua
| * - Path-based snapshot reads have a working file-path-based fallback in existing | ||
| * lineage tooling; changing their behavior is a separate decision. | ||
| */ | ||
| object HoodieIncrementalRelationIdentifier extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
🤖 nit: the object handles both incremental and CDC reads (per isIncrementalOrCDC), but the name only mentions Incremental. Could you rename to something like HoodieIncrementalAndCDCRelationIdentifier (or just HoodiePathBasedRelationIdentifier) so future readers searching for CDC behavior find it here?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| if lr.catalogTable.isEmpty | ||
| && lr.relation.isInstanceOf[HadoopFsRelation] | ||
| && isIncrementalOrCDC(lr.relation.asInstanceOf[HadoopFsRelation].location) => | ||
| val fsRelation = lr.relation.asInstanceOf[HadoopFsRelation] |
There was a problem hiding this comment.
🤖 nit: lr.relation.asInstanceOf[HadoopFsRelation] is repeated three times across the guard, the rebinding, and the metaClient extraction. Could you bind it once (e.g. an @ pattern or a val after the match) so the cast appears in only one place?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18726 +/- ##
============================================
+ Coverage 68.10% 68.14% +0.03%
- Complexity 29020 29092 +72
============================================
Files 2516 2517 +1
Lines 140934 141135 +201
Branches 17475 17514 +39
============================================
+ Hits 95987 96175 +188
+ Misses 37058 37046 -12
- Partials 7889 7914 +25
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Today, a downstream consumer that walks an analyzed Spark plan to extract column lineage hits two dead ends on Hudi:
MergeIntoHoodieTableCommandextendsHoodieLeafRunnableCommand, which forceschildren = Nil. That is the right call for the Catalyst optimizer (we don't want generic optimizer rules rewriting the source/target subtrees out from under our custom resolution), but it also hides the source plan, target plan, and merge condition from every plan walker, so lineage tooling sees a leaf with no inputs and reports the merge as having no upstream columns. The other Hudi write commands (Insert,Update,Delete,CTAS) were already migrated toDataWritingCommand/UnaryCommandin [HUDI-7915] Spark 4 support #12772, which exposes the source plan via standardchildren, so MERGE is the lone gap on master.spark.read.format("hudi").option("hoodie.datasource.query.type", "incremental").load(path), the resultingLogicalRelation.catalogTableisNone. Lineage tooling then falls back to the relation's class name ("HadoopFsRelation") as the dataset identifier, which collides across every Hudi incremental read in the job and is useless for tracking which table a query came from.Summary and Changelog
Make Hudi's analyzed plans introspectable to lineage tooling that walks Catalyst plans (e.g. OpenLineage Spark integration, Atlas, custom analyzer extensions, etc.). Two changes:
MergeIntoHoodieTableCommand.innerChildren: expose the analyzedMergeIntoTable(source plan, target plan, merge condition, matched / not-matched actions) without breaking the optimizer leaf contract.HoodieIncrementalRelationIdentifier: post-hoc analyzer rule that stamps a synthesizedCatalogTable(table name, base path, schema) ontoLogicalRelations backed byHoodieIncrementalFileIndex/HoodieCDCFileIndexwhen the read entered via path (i.e. no catalog registration).Tracking issue: #18298
Impact
innerChildrenfor MERGETreeNode.innerChildrenis Spark's documented escape hatch for plan nodes that need to expose subtrees for display/inspection without participating in optimizer traversal. It is not visited bytransform/mapChildren, so there is no risk of generic Catalyst rules re-writing the innerMergeIntoTable. It is rendered byEXPLAINand is the conventional access point for plan walkers. This is the same pattern Spark itself uses forWriteToDataSourceV2Execand other commands that wrap a logical plan they don't want optimized as a child. TheHoodieLeafLike#children = Nilcontract is preserved. This change is a purely additive method override.HoodieIncrementalRelationIdentifierThe rule runs as a
customPostHocResolutionRule, so it sees the resolved plan after Hudi's own analyzer rules have built the relation. It only fires when all of the following hold:LogicalRelationoverHadoopFsRelationcatalogTableisNone(catalog-registered reads are left alone)HoodieIncrementalFileIndexorHoodieCDCFileIndexWhen it matches, it pulls the table name, base path, and database name from the existing
HoodieTableMetaClientcarried by the existing file index (no extra metadata / FS calls), and synthesizes aCatalogTablefrom that plus the relation's resolved schema. Database falls back to Spark'sdefaultwhenhoodie.database.nameis unset, matching existing path-based read behavior.Why scope is limited to incremental and CDC:
catalogTable.The transform is wrapped in
AnalysisHelper.allowInvokingTransformsInAnalyzer { ... }, matching existing convention inHoodieAnalysis.scala(e.g.AdaptIngestionTargetLogicalRelations).Risk Level
Low
innerChildrenis consumed only byEXPLAINrendering and opt-in plan walkers; nothing in Hudi's write path or Spark's optimizer traverses it.catalogTableis already set, so catalog-registered tables are unaffected.Documentation Update
None
Test plan
Two new Spark integration tests under
hudi-spark-datasource/hudi-spark/src/test/scala/.../analysis/:TestMergeIntoHoodieTableCommandInnerChildren:testMergeIntoExposesAnalyzedMergeIntoTableViaInnerChildren: assertscmd.children.isEmpty(leaf contract preserved) andcmd.innerChildrencontains the analyzedMergeIntoTablewith reachable source/target leaves, non-null merge condition, and the expected matched / not-matched actions.testExplainShowsSourceTableViaInnerChildren: asserts the source view name shows up inEXPLAINoutput, proving the round-trip through Spark's renderer works.TestHoodieIncrementalRelationIdentifier:testPathBasedIncrementalReadGetsCatalogTable(parameterized overcow/mor): asserts the rule populatescatalogTablewith the Hudi table name, base-path URI, schema, andprovider = "hudi".testCatalogRegisteredIncrementalReadIsNotMutated: asserts that catalog-registered tables keep their originalcatalogTable(rule doesn't over-fire).testSnapshotPathBasedReadIsNotEnriched: asserts snapshot path reads remain untouched (scope is intentional).Contributor's checklist