Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Hudi Upsert operation taking too long under writing data for base table and metadata table #12828

Open
dataproblems opened this issue Feb 11, 2025 · 1 comment

Comments

@dataproblems
Copy link

dataproblems commented Feb 11, 2025

Describe the problem you faced

I am trying to perform an upsert to hudi with a dataframe of 200 M records and I noticed that it is taking an hour to complete this process. My hudi table has record level index enabled on it and I would like to speed up this upsert process to finish in a matter of minutes as opposed to an hour. Please see the attached UI screenshots specifically jobs 32 and 37 and under them stages 70 and 98 respectively.

We are upserting about 181309872 records to the table. I see that there are 23360 occurrences of fileId in the commit file, which I'm assuming is the number of files that hudi modifies during this upsert operation.

Questions

  1. For stage 98, where does the 20003 number for tasks come from? None of my spark config has that value and I want to know if I can configure it so that I can try increasing it to get a lower time for updating the metadata.
  2. For stage 70, is there anything that we can do to improve the time it takes to write the data files? We've to use a copy on write table for our consumers to get the latest version after the commit file. It would be good to understand if there are any configurations that we can tune to get a better performance here.
  3. Should we include any other configuration in our upsert configuration to ensure that our data upsert works in the most efficient manner?
  4. Another puzzling thing I observed was the size of the commit files for upsert operations - they were almost as large as the commit file that was created for the table creation operation using INSERT? Do you know why that might be? See the following image for the sizes.

Image

I'm trying to determine if we have hit the lower limit for the performance in terms of time of upsert operation and see if there is room for improvement or not.

To Reproduce

  • Read the data
  • Upsert into hudi

Expected behavior

The upsert operation would finish in minutes as opposed to taking an hour.

Environment Description

  • Hudi version : 0.15.0

  • Spark version : 3.4.1

  • Hive version :

  • Hadoop version : 2.7.5

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Spark Configuration

      "spark.checkpoint.compress": "true",
      "spark.default.parallelism": "15800",
      "spark.driver.cores": "8",
      "spark.driver.defaultJavaOptions": "-XX:+UseG1GC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseStringDeduplication -Dlog4j2.configuration=log4j2.properties",
      "spark.driver.maxResultSize": "0",
      "spark.driver.memory": "53g",
      "spark.driver.memoryOverhead": "10g",
      "spark.eventLog.logBlockUpdates.enabled": "true",
      "spark.executor.cores": "8",
      "spark.executor.defaultJavaOptions": "-XX:+UseG1GC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseStringDeduplication -Dlog4j2.configuration=log4j2.properties",
      "spark.executor.memory": "53g",
      "spark.executor.memoryOverhead": "10g",
      "spark.hadoop.mapred.output.compress": "true",
      "spark.hadoop.mapred.output.compression.codec": "snappy",
      "spark.io.compression.codec": "zstd",
      "spark.memory.fraction": "0.80",
      "spark.memory.storageFraction": "0.30",
      "spark.num.executors": "395",
      "spark.rdd.compress": "true",
      "spark.scheduler.reporterThread.maxFailures": "5",
      "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
      "spark.shuffle.compress": "true",
      "spark.shuffle.spill.compress": "true",
      "spark.sql.adaptive.coalescePartitions.parallelismFirst": "false",
      "spark.sql.adaptive.enabled": "true",
      "spark.sql.autoBroadcastJoinThreshold": "20965760",
      "spark.sql.files.maxPartitionBytes": "536870912",
      "spark.sql.inMemoryColumnarStorage.compressed": "true",
      "spark.sql.parquet.fs.optimized.committer.optimization-enabled": "true",
      "spark.sql.shuffle.partitions": "15800",
      "spark.storage.level": "MEMORY_AND_DISK_SER"

Hudi upsert configuration

We disable the timeline server and use DIRECT markers as per this other support ticket

hoodie.embed.timeline.server -> false, 
hoodie.parquet.small.file.limit -> 1073741824, 
hoodie.metadata.record.index.enable -> true, 
hoodie.datasource.write.precombine.field -> $precombineField, 
hoodie.datasource.write.payload.class -> org.apache.hudi.common.model.OverwriteWithLatestAvroPayload, hoodie.metadata.index.column.stats.enable -> true, 
hoodie.parquet.max.file.size -> 2147483648, 
hoodie.metadata.enable -> true, 
hoodie.index.type -> RECORD_INDEX, 
hoodie.datasource.write.operation -> upsert, 
hoodie.parquet.compression.codec -> snappy, 
hoodie.datasource.write.recordkey.field -> $recordKey, 
hoodie.table.name -> $tableName, 
hoodie.datasource.write.table.type -> COPY_ON_WRITE, 
hoodie.datasource.write.hive_style_partitioning -> true, 
hoodie.cleaner.policy -> KEEP_LATEST_COMMITS,
hoodie.write.markers.type -> DIRECT, 
hoodie.populate.meta.fields -> true, 
hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.SimpleKeyGenerator, 
hoodie.cleaner.policy.failed.writes -> LAZY, 
hoodie.upsert.shuffle.parallelism -> 23700, 
hoodie.cleaner.commits.retained -> 10, 
hoodie.datasource.write.partitionpath.field -> $partitionField

Spark UI screenshots with details

Details for Job 32

Image

Details for Job 37

Image

Details for Stage 70

Image

Details for Stage 98

Image

Spark Job View for the upsert operation

Image

Hudi table partition with object count and size

We tried using a partition key that logically partitions the data, however, due to the nature of our data we have a skew in our partitions as you can see.
Image

@danny0405
Copy link
Contributor

@ad1happy2go Can you help here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants