Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better cache management with broadcast and caches. #406

Open
4 tasks
cugni opened this issue Sep 10, 2024 · 0 comments
Open
4 tasks

Better cache management with broadcast and caches. #406

cugni opened this issue Sep 10, 2024 · 0 comments
Labels
type: enhancement Improvement of existing feature or code type:performance

Comments

@cugni
Copy link
Member

cugni commented Sep 10, 2024

What went wrong?

In Qbeast Spark, we use command broadcast in multiple places (e.g., twice in the BoreadcastedTableChanges, 1 in the DenormalizedBlock, and 4 in the OTreeDataAnalyzer), but we never unpersist or destroy the broadcast, which occupies unnecessary space in Spark's memory. Spark uses LRU policy to evict the cache, but if we can tell that a broadcast is no longer used, we should clean it to avoid conflicts with other workloads in the same Spark cluster.

The complexity of solving this is that in our API, we often pass Dataframes, so the part of the code that is creating the broadcast is not the one that is executing the final action on the Dataframe; we have no clear way to understand when it is a good time to call a unpersist.
For example, in this not-working scala pseudo-code:

def f(df:Dataframe):Dataframe =>{
val bc = spark.sparkContext.broadcast(Seq(1,2,3))
df.mapPartition(
rows ->{
val l = bd.value
// do something
})
// I can't call bc.unpersist(), as the broadcast hasn't been used yet. 
}


val data = f(spark.range(10)).collect()
// now I can unpersist bc, but I have no reference to it. 

Ideally, we should have something like

def f(df:Dataframe):Dataframe =>{
val bc = QbeastCacheContext.broadcast(Seq(1,2,3))
df.mapPartition(
rows ->{
val l = bd.value
// do something
})
}

QbeastCacheContext.init()
val data = f(spark.range(10)).collect()
QbeastCacheContext.release()

TODO

  • understand if it is a problem (what's the impact of leaving an object unused in the cache)?
  • Propose an API
  • Write code and test of the API
  • Update the code to use the new API.
@cugni cugni added type: enhancement Improvement of existing feature or code priority: normal This issue has normal priority type:performance labels Sep 10, 2024
@fpj fpj added type:performance and removed priority: normal This issue has normal priority type:performance labels Oct 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement Improvement of existing feature or code type:performance
Projects
None yet
Development

No branches or pull requests

2 participants