You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In Qbeast Spark, we use command broadcast in multiple places (e.g., twice in the BoreadcastedTableChanges, 1 in the DenormalizedBlock, and 4 in the OTreeDataAnalyzer), but we never unpersist or destroy the broadcast, which occupies unnecessary space in Spark's memory. Spark uses LRU policy to evict the cache, but if we can tell that a broadcast is no longer used, we should clean it to avoid conflicts with other workloads in the same Spark cluster.
The complexity of solving this is that in our API, we often pass Dataframes, so the part of the code that is creating the broadcast is not the one that is executing the final action on the Dataframe; we have no clear way to understand when it is a good time to call a unpersist.
For example, in this not-working scala pseudo-code:
deff(df:Dataframe):Dataframe=>{
valbc= spark.sparkContext.broadcast(Seq(1,2,3))
df.mapPartition(
rows ->{
vall= bd.value
// do something
})
// I can't call bc.unpersist(), as the broadcast hasn't been used yet.
}
valdata= f(spark.range(10)).collect()
// now I can unpersist bc, but I have no reference to it.
What went wrong?
In Qbeast Spark, we use command broadcast in multiple places (e.g., twice in the BoreadcastedTableChanges, 1 in the DenormalizedBlock, and 4 in the OTreeDataAnalyzer), but we never unpersist or destroy the broadcast, which occupies unnecessary space in Spark's memory. Spark uses LRU policy to evict the cache, but if we can tell that a broadcast is no longer used, we should clean it to avoid conflicts with other workloads in the same Spark cluster.
The complexity of solving this is that in our API, we often pass Dataframes, so the part of the code that is creating the broadcast is not the one that is executing the final action on the Dataframe; we have no clear way to understand when it is a good time to call a unpersist.
For example, in this not-working scala pseudo-code:
Ideally, we should have something like
TODO
The text was updated successfully, but these errors were encountered: