Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix spark logic in DLO strategy generation #293

Merged
merged 2 commits into from
Mar 7, 2025

Conversation

cbb330
Copy link
Collaborator

@cbb330 cbb330 commented Mar 7, 2025

Summary

problem: after deploying changes to spark cluster,

org.apache.spark.SparkException: ...  java.io.IOException: unexpected exception type
...
	at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248)
	... 53 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
	at com.linkedin.openhouse.datalayout.generator.OpenHouseDataLayoutStrategyGenerator.$deserializeLambda$(OpenHouseDataLayoutStrategyGenerator.java:27)
	... 63 more
	...
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3005)
	at com.linkedin.openhouse.datalayout.generator.OpenHouseDataLayoutStrategyGenerator.computeFileStats(OpenHouseDataLayoutStrategyGenerator.java:237)
	at com.linkedin.openhouse.datalayout.generator.OpenHouseDataLayoutStrategyGenerator.buildDataLayoutStrategy(OpenHouseDataLayoutStrategyGenerator.java:130)
	at com.linkedin.openhouse.datalayout.generator.OpenHouseDataLayoutStrategyGenerator.generateTableLevelStrategies(OpenHouseDataLayoutStrategyGenerator.java:64)
	at com.linkedin.openhouse.jobs.spark.DataLayoutStrategyGeneratorSparkApp.runInnerTableScope(DataLayoutStrategyGeneratorSparkApp.java:53)
	at com.linkedin.openhouse.jobs.spark.DataLayoutStrategyGeneratorSparkApp.runInner(DataLayoutStrategyGeneratorSparkApp.java:46)
	at com.linkedin.openhouse.jobs.spark.BaseSparkApp.run(BaseSparkApp.java:56)
	at com.linkedin.openhouse.jobs.spark.DataLayoutStrategyGeneratorSparkApp.main(DataLayoutStrategyGeneratorSparkApp.java:185)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

solution: change the lambda function to be executed within a java stream instead of a spark stream

explanation:
this bug cannot be reproduced in unittest

was unable to reproduce in spark-shell, but possibly due to difference between actual code and re-created environment’s code

there were challenges to using a spark jwdp debugger: FileContent classnotfound, FileContent.DATA method not found — possibly due to intellij not handling shaded class well

only guesses could be gathered from debugging/googling/investigating the stack trace, and made less deterministic since nearly the same logic is executed a few lines in advance without any issue.

so we decide to avoid the lambda expression pushing down to spark executor entirely and rely on just the java stream

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

since jwdp debugger could not effectively troubleshoot this issue, I built OS openhouse:
./gradlew build

then copied the uber jar into li-openhouse's spark deployable with reference in gradle like
implementation(files("/Users/chbush/code/li/openhouse/build/openhouse-spark-apps_2.12/libs/openhouse-spark-apps_2.12-1.0.0-uber.jar"))

I opened the jar jar xf and inspected the byte code of the new deployable and confirmed the updated code was within.

then deployed this to a staging artifactory repo and referenced the jar in spark. once executing, I hooked in with a jwdp debugger to confirm the line of code I expected was being triggered files.collectAsList().stream().filter(file -> file.getContent().equals(content));

we see the entirety of the task executes, but finishes with a new error
image

org.apache.spark.sql.AnalysisException: Cannot write to 'openhouse.u_openhouse.dlo_strategies', too many data columns:
Table columns: 'fqtn', 'timestamp', 'estimated_compute_cost', 'estimated_file_count_reduction', 'file_size_entropy'
Data columns: 'col1', 'col2', 'col3', 'col4', 'col5', 'col6', 'col7', 'col8', 'col9', 'col10', 'col11'

this is expected on the staging cluster since these columns do not exist yet 🥳

fixing staging cluster with desired columns

+-----------+------------------------+-----------+
|namespace  |tableName               |isTemporary|
+-----------+------------------------+-----------+
|u_openhouse|dctest                  |false      |
|u_openhouse|dlo_partition_strategies|false      |
|u_openhouse|dlo_strategies          |false      |
|u_openhouse|dlo_t1                  |false      |
|u_openhouse|t1                      |false      |
|u_openhouse|t11                     |false      |
|u_openhouse|t12                     |false      |
|u_openhouse|t2                      |false      |
|u_openhouse|t3                      |false      |
|u_openhouse|t4                      |false      |
|u_openhouse|t5                      |false      |
|u_openhouse|test_rohit              |false      |
|u_openhouse|test_rohit1             |false      |
|u_openhouse|test_rohit11            |false      |
|u_openhouse|test10                  |false      |
|u_openhouse|test8                   |false      |
|u_openhouse|test9                   |false      |
+-----------+------------------------+-----------+

scala> val alterTableSQL = "ALTER TABLE openhouse.u_openhouse.dlo_strategies ADD COLUMNS (pos_delete_file_count LONG, eq_delete_file_count LONG, pos_delete_file_bytes LONG, eq_delete_file_bytes LONG, pos_delete_record_count LONG, eq_delete_record_count LONG)"
alterTableSQL: String = ALTER TABLE openhouse.u_openhouse.dlo_strategies ADD COLUMNS (pos_delete_file_count LONG, eq_delete_file_count LONG, pos_delete_file_bytes LONG, eq_delete_file_bytes LONG, pos_delete_record_count LONG, eq_delete_record_count LONG)

scala>

scala> spark.sql(alterTableSQL)
25/03/07 10:52:40 WARN iceberg.BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up
res1: org.apache.spark.sql.DataFrame = []

alterTableSQL: String = ALTER TABLE openhouse.u_openhouse.dlo_partition_strategies ADD COLUMNS (pos_delete_file_count LONG, eq_delete_file_count LONG, pos_delete_file_bytes LONG, eq_delete_file_bytes LONG, pos_delete_record_count LONG, eq_delete_record_count LONG)

scala>

scala> spark.sql(alterTableSQL)
25/03/07 10:56:25 WARN iceberg.BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up
res2: org.apache.spark.sql.DataFrame = []

results in success

25/03/07 11:02:40 INFO  BaseMetastoreTableOperations: Successfully committed to table u_openhouse.dlo_strategies in 2499 ms

25/03/07 11:03:02 INFO  BaseMetastoreTableOperations: Successfully committed to table u_openhouse.dlo_partition_strategies in 582 ms

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

For all the boxes checked, include additional details of the changes made in this pull request.

@cbb330 cbb330 merged commit dad6f0c into linkedin:main Mar 7, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants