Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] HoodieSparkSessionExtension and IcebergSparkSessionExtensions conflict #12808

Open
pravin1406 opened this issue Feb 7, 2025 · 7 comments
Labels
spark Issues related to spark

Comments

@pravin1406
Copy link

I have a use case where i may have to call spark procedures or metadata queries on hudi tables as well as iceberg tables within same spark job.
I have to run all my uses cases from spark sql...so can't call java api's directly in this case, hence using sql procedures.
But when i set
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

It does not matter what value i set to spark_catalog. None of the combination work.
i have tried keeping hoodie extension last and then set spark_catalog to below..but still no help.

spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type = hive
spark.sql.catalog.spark_catalog.uri = thrift://localhost:9083

Only iceberg procedure works...if i reverse the order of extension only the latter extension procedures works.
I get following error

org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException: Procedure default.show_commits not found
and
org.apache.spark.sql.AnalysisException: procedure: rewrite_data_files is not exists

Is there a way to make both extension work? or even when i add more extensions which i do intend to.

Steps to reproduce

Run spark-shell with
following 2 properties set
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.jars=iceberg-spark-runtime-3.2_2.12-1.4.3.2.jar,hudi-spark3.2-bundle_2.12-0.14.1.4.jar

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

  • Hudi version : 0.14.1

  • Spark version : 3.3.2

  • Hadoop version : 3.1.3

  • Storage (HDFS/S3/GCS..) : HDFS

  • Running on Docker? (yes/no) : no

Sample stacktrace in both cases
scala> spark.sql("call show_commits(table => 'default.18nove_try1_rt', limit => 100)").show(false) ANTLR Tool version 4.9.3 used for code generation does not match the current runtime version 4.8ANTLR Runtime version 4.9.3 used for parser compilation does not match the current runtime version 4.8ANTLR Tool version 4.9.3 used for code generation does not match the current runtime version 4.8ANTLR Runtime version 4.9.3 used for parser compilation does not match the current runtime version 4.8 org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException: Procedure default.show_commits not found spark.sql.catalyst.analysis.NoSuchProcedureException: Procedure default.show_commits not found at org.apache.iceberg.spark.BaseCatalog.loadProcedure(BaseCatalog.java:57) at org.apache.iceberg.spark.SparkSessionCatalog.loadProcedure(SparkSessionCatalog.java:56) at org.apache.spark.sql.catalyst.analysis.ResolveProcedures$$anonfun$apply$1.applyOrElse(ResolveProcedures.scala:47) at org.apache.spark.sql.catalyst.analysis.ResolveProcedures$$anonfun$apply$1.applyOrElse(ResolveProcedures.scala:45) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.analysis.ResolveProcedures.apply(ResolveProcedures.scala:45) at org.apache.spark.sql.catalyst.analysis.ResolveProcedures.apply(ResolveProcedures.scala:41) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:215) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:209) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:172) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)

scala> spark.sql("CALL spark_catalog.system.rewrite_data_files(table => 'iceberg_staging.iceberg_6nov_try1', options => map('max-file-size-bytes','1073741824'))").show(false) org.apache.spark.sql.AnalysisException: procedure: rewrite_data_files is not exists at org.apache.spark.sql.hudi.analysis.ResolveImplementations.loadProcedure(HoodieAnalysis.scala:472) at org.apache.spark.sql.hudi.analysis.ResolveImplementations.$anonfun$apply$3(HoodieAnalysis.scala:436) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.hudi.analysis.ResolveImplementations.apply(HoodieAnalysis.scala:405) at org.apache.spark.sql.hudi.analysis.ResolveImplementations.apply(HoodieAnalysis.scala:401) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:215) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:209) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:172) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:193) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78)

@ad1happy2go
Copy link
Collaborator

@pravin1406 We can only set one value for this. You may need separate spark sessions if you want to do so

@ad1happy2go ad1happy2go added the spark Issues related to spark label Feb 7, 2025
@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Feb 7, 2025
@rangareddy
Copy link
Contributor

Hi @pravin1406

As Aditya mentioned in an earlier comment, we cannot execute procedures for two different table formats in a single Spark session. We can only execute one SQL procedure per Spark session.

If we attempt to use multiple procedures, we will encounter a similar type of exception.

spark-sql (default)> CALL iceberg_catalog.system.rewrite_data_files('default.iceberg_tbl');
procedure: rewrite_data_files is not exists

Example code:

spark-sql \
	--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
    --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
    --conf spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.iceberg_catalog.type=hive
CREATE TABLE iceberg_catalog.default.iceberg_tbl (id bigint, data string) USING iceberg;

INSERT INTO iceberg_catalog.default.iceberg_tbl VALUES (1, 'a'), (2, 'b'), (3, 'c');

SELECT * FROM iceberg_catalog.default.iceberg_tbl;

CALL iceberg_catalog.system.rewrite_data_files('default.iceberg_tbl');

CREATE TABLE hudi_tbl (
    ts BIGINT,
    uuid STRING,
    rider STRING,
    driver STRING,
    fare DOUBLE,
    city STRING
) USING HUDI
PARTITIONED BY (city);

INSERT INTO hudi_tbl
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'    ),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'    ),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'      ),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');

SELECT * FROM hudi_tbl;

call show_commits(table => 'hudi_tbl', limit => 10);

@ad1happy2go ad1happy2go moved this from ⏳ Awaiting Triage to 👤 User Action in Hudi Issue Support Feb 7, 2025
@pravin1406
Copy link
Author

pravin1406 commented Feb 8, 2025

@rangareddy @ad1happy2go

Shouldn't one format's parser delegate it to other format's parser in case a procedure is not available to be executed.
If not it nullifies the purpose of having spark support multiple spark sql extension.
It also becomes an unnecessary issue to deal with as many orgs have use cases where they may be using both formats on need basis.

Also i see iceberg has a PR merged recently to somehow handle this, can be wrong.
apache/iceberg#11480

I think hudi should also take care of this. I tested the PR above seems it's working...by keeping iceberg extension after hudi extension.
What are your thoughts ?

@rangareddy
Copy link
Contributor

Hi @pravin1406

I will try to discuss about this feature enhancement with hudi eng team.

@ad1happy2go
Copy link
Collaborator

@pravin1406 Its little tricky to use both within same spark session , do you have any ideas here?

@pravin1406
Copy link
Author

I can think of implementing it exactly how that iceberg PR has check if the command is hudi command based on existing procedures in hudi.
Will have a go at it and try if i can make it work.

@rangareddy
Copy link
Contributor

Hi @pravin1406

Thanks a lot for your interest. Can you please create a jira and let us know if you need any help to start?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
spark Issues related to spark
Projects
Status: 👤 User Action
Development

No branches or pull requests

3 participants