Skip to content

feat(spark): Make Hudi's analyzed plans introspectable to lineage tooling#18726

Open
codope wants to merge 2 commits into
apache:masterfrom
codope:hudi-18298
Open

feat(spark): Make Hudi's analyzed plans introspectable to lineage tooling#18726
codope wants to merge 2 commits into
apache:masterfrom
codope:hudi-18298

Conversation

@codope
Copy link
Copy Markdown
Member

@codope codope commented May 13, 2026

Describe the issue this Pull Request addresses

Today, a downstream consumer that walks an analyzed Spark plan to extract column lineage hits two dead ends on Hudi:

  • Hudi MERGE is opaque. MergeIntoHoodieTableCommand extends HoodieLeafRunnableCommand, which forces children = Nil. That is the right call for the Catalyst optimizer (we don't want generic optimizer rules rewriting the source/target subtrees out from under our custom resolution), but it also hides the source plan, target plan, and merge condition from every plan walker, so lineage tooling sees a leaf with no inputs and reports the merge as having no upstream columns. The other Hudi write commands (Insert, Update, Delete, CTAS) were already migrated to DataWritingCommand/UnaryCommand in [HUDI-7915] Spark 4 support #12772, which exposes the source plan via standard children, so MERGE is the lone gap on master.
  • Path-based incremental / CDC reads are anonymous. When a user runs spark.read.format("hudi").option("hoodie.datasource.query.type", "incremental").load(path), the resulting LogicalRelation.catalogTable is None. Lineage tooling then falls back to the relation's class name ("HadoopFsRelation") as the dataset identifier, which collides across every Hudi incremental read in the job and is useless for tracking which table a query came from.

Summary and Changelog

Make Hudi's analyzed plans introspectable to lineage tooling that walks Catalyst plans (e.g. OpenLineage Spark integration, Atlas, custom analyzer extensions, etc.). Two changes:

  1. MergeIntoHoodieTableCommand.innerChildren: expose the analyzed MergeIntoTable (source plan, target plan, merge condition, matched / not-matched actions) without breaking the optimizer leaf contract.
  2. HoodieIncrementalRelationIdentifier: post-hoc analyzer rule that stamps a synthesized CatalogTable (table name, base path, schema) onto LogicalRelations backed by HoodieIncrementalFileIndex / HoodieCDCFileIndex when the read entered via path (i.e. no catalog registration).

Tracking issue: #18298

Impact

innerChildren for MERGE

TreeNode.innerChildren is Spark's documented escape hatch for plan nodes that need to expose subtrees for display/inspection without participating in optimizer traversal. It is not visited by transform / mapChildren, so there is no risk of generic Catalyst rules re-writing the inner MergeIntoTable. It is rendered by EXPLAIN and is the conventional access point for plan walkers. This is the same pattern Spark itself uses for WriteToDataSourceV2Exec and other commands that wrap a logical plan they don't want optimized as a child. The HoodieLeafLike#children = Nil contract is preserved. This change is a purely additive method override.

HoodieIncrementalRelationIdentifier

The rule runs as a customPostHocResolutionRule, so it sees the resolved plan after Hudi's own analyzer rules have built the relation. It only fires when all of the following hold:

  • The node is a LogicalRelation over HadoopFsRelation
  • catalogTable is None (catalog-registered reads are left alone)
  • The location is HoodieIncrementalFileIndex or HoodieCDCFileIndex

When it matches, it pulls the table name, base path, and database name from the existing HoodieTableMetaClient carried by the existing file index (no extra metadata / FS calls), and synthesizes a CatalogTable from that plus the relation's resolved schema. Database falls back to Spark's default when hoodie.database.name is unset, matching existing path-based read behavior.

Why scope is limited to incremental and CDC:

  • Catalog reads already populate catalogTable.
  • Path-based snapshot reads have a working file-path-based fallback in existing lineage extractors. Whether to enrich them too is a separate decision (easy to extend later.

The transform is wrapped in AnalysisHelper.allowInvokingTransformsInAnalyzer { ... }, matching existing convention in HoodieAnalysis.scala (e.g. AdaptIngestionTargetLogicalRelations).

Risk Level

Low

  • innerChildren is consumed only by EXPLAIN rendering and opt-in plan walkers; nothing in Hudi's write path or Spark's optimizer traverses it.
  • The analyzer rule is a no-op when catalogTable is already set, so catalog-registered tables are unaffected.
  • No write path, no public API surface, no config changes.

Documentation Update

None

Test plan

Two new Spark integration tests under hudi-spark-datasource/hudi-spark/src/test/scala/.../analysis/:

TestMergeIntoHoodieTableCommandInnerChildren:

  • testMergeIntoExposesAnalyzedMergeIntoTableViaInnerChildren: asserts cmd.children.isEmpty (leaf contract preserved) and cmd.innerChildren contains the analyzed MergeIntoTable with reachable source/target leaves, non-null merge condition, and the expected matched / not-matched actions.
  • testExplainShowsSourceTableViaInnerChildren: asserts the source view name shows up in EXPLAIN output, proving the round-trip through Spark's renderer works.

TestHoodieIncrementalRelationIdentifier:

  • testPathBasedIncrementalReadGetsCatalogTable (parameterized over cow / mor): asserts the rule populates catalogTable with the Hudi table name, base-path URI, schema, and provider = "hudi".
  • testCatalogRegisteredIncrementalReadIsNotMutated: asserts that catalog-registered tables keep their original catalogTable (rule doesn't over-fire).
  • testSnapshotPathBasedReadIsNotEnriched: asserts snapshot path reads remain untouched (scope is intentional).

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

…ling

Signed-off-by: codope <sagarsumit09@gmail.com>
@github-actions github-actions Bot added the size:L PR with lines of changes in (300, 1000] label May 13, 2026
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! The PR makes Hudi MERGE plans and path-based incremental/CDC reads introspectable to lineage tooling — exposing mergeInto via innerChildren while preserving the Catalyst leaf contract, and stamping a synthesized CatalogTable onto incremental/CDC LogicalRelations. The big concern from this pass is a Spark 4 compile compatibility issue in HoodieIncrementalRelationIdentifier — please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here. One small naming nit below; the rest of the code is clean and well-documented.

…mentalRelationIdentifier

Signed-off-by: codope <sagarsumit09@gmail.com>
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the iteration! The Spark-version-agnostic type pattern in HoodieIncrementalRelationIdentifier and the innerChildren addition for MergeIntoHoodieTableCommand both look reasonable on this pass. I traced the rule's interaction with downstream catalogTable.isDefined callers and Spark's partition-tracking rules and didn't find regressions. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One naming nit below — otherwise the new rule and tests are clear and well-commented.

cc @yihua

* - Path-based snapshot reads have a working file-path-based fallback in existing
* lineage tooling; changing their behavior is a separate decision.
*/
object HoodieIncrementalRelationIdentifier extends Rule[LogicalPlan] {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the object handles both incremental and CDC reads (per isIncrementalOrCDC), but the name only mentions Incremental. Could you rename to something like HoodieIncrementalAndCDCRelationIdentifier (or just HoodiePathBasedRelationIdentifier) so future readers searching for CDC behavior find it here?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

if lr.catalogTable.isEmpty
&& lr.relation.isInstanceOf[HadoopFsRelation]
&& isIncrementalOrCDC(lr.relation.asInstanceOf[HadoopFsRelation].location) =>
val fsRelation = lr.relation.asInstanceOf[HadoopFsRelation]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: lr.relation.asInstanceOf[HadoopFsRelation] is repeated three times across the guard, the rebinding, and the metaClient extraction. Could you bind it once (e.g. an @ pattern or a val after the match) so the cast appears in only one place?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 82.60870% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.14%. Comparing base (b6bc165) to head (21143fe).
⚠️ Report is 7 commits behind head on master.

Files with missing lines Patch % Lines
...pache/spark/sql/hudi/analysis/HoodieAnalysis.scala 82.60% 0 Missing and 4 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18726      +/-   ##
============================================
+ Coverage     68.10%   68.14%   +0.03%     
- Complexity    29020    29092      +72     
============================================
  Files          2516     2517       +1     
  Lines        140934   141135     +201     
  Branches      17475    17514      +39     
============================================
+ Hits          95987    96175     +188     
+ Misses        37058    37046      -12     
- Partials       7889     7914      +25     
Flag Coverage Δ
common-and-other-modules 44.41% <21.73%> (+0.03%) ⬆️
hadoop-mr-java-client 45.00% <ø> (-0.03%) ⬇️
spark-client-hadoop-common 48.32% <ø> (-0.03%) ⬇️
spark-java-tests 49.02% <82.60%> (+0.02%) ⬆️
spark-scala-tests 44.91% <82.60%> (+0.01%) ⬆️
utilities 37.62% <78.26%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...pache/spark/sql/hudi/analysis/HoodieAnalysis.scala 74.91% <82.60%> (+0.95%) ⬆️

... and 47 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants