-
Notifications
You must be signed in to change notification settings - Fork 330
Evaluate visualizations in parallel using reactive observers #13219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
Most of `ExecutionService` now returns Futures that can be easier to compose and execute via threadpools.
Plus some minor tweaks for random failures.
Still fighting with the consequences of enabling parallel execution.
So we have to ensure that compilations done by visualizations are properly locked but at the same time do not block interpreter. |
public HostClassLoader() { | ||
private static final HostClassLoader INSTANCE = new HostClassLoader(); | ||
|
||
public static HostClassLoader getClassLoader() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- there are various attempts to rewrite this
HostClassLoader
- one of them is Package libraries Java code as Java Modules #10714
- it attempts to provide independent classloader for each library
- making
HostClassLoader
a singleton goes against such efforts - but it may be good for purposes of this PR
- I guess we can unsingletonize the classloader later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts on classloading.
l = findClass(name); | ||
if (resolve) { | ||
l.getMethods(); | ||
synchronized (INSTANCE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The future of HostClassLoader
is to have multiple instances, thus this should probably be
synchronized (INSTANCE) { | |
synchronized (this) { |
otherwise we have problems with this code when removing the singleton nature of the class loader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe I had before and got a deadlock with one thread keeping a lock on ConcurrentHashMap and the other having a lock on this
both unable to proceed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Right now
this == INSTANCE
- becauseHostClassLoader
is made singleton. E.g. there is going to be no change in behavior. - Re. "lock on
ConcurrentHashMap
" - who is using locks onConcurrentHashMap
? It is supposed to be lock free...
@@ -80,7 +80,7 @@ public final class EnsoContext { | |||
|
|||
private final EnsoLanguage language; | |||
private final Env environment; | |||
private final HostClassLoader hostClassLoader = new HostClassLoader(); | |||
private final HostClassLoader hostClassLoader = HostClassLoader.getClassLoader(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Package libraries Java code as Java Modules #10714 is also changing this line, btw.
- there are the future plans
if (!isRuntimeModInBootLayer && name.startsWith("org.graalvm")) { | ||
return polyglotClassLoader.loadClass(name); | ||
} | ||
if (name.startsWith("org.slf4j")) { | ||
// Delegating to system class loader ensures that log classes are not loaded again | ||
// and do not require special setup. In other words, it is using log configuration that | ||
// has been setup by the runner that started the process. See #11641. | ||
return polyglotClassLoader.loadClass(name); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- What can be the source of concurrency problems? I am asking myself
- probably just the read and write to
loadedClasses
, right? - This can be moved out of the
synchronized
block then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The failures were rather non-obvious but related to classloading such as
Caused by: java.lang.LinkageError: loader org.enso.interpreter.runtime.HostClassLoader @7dc6887e attempted duplicate class definition for org.enso.base.Text_Utils. (org.enso.base.Text_Utils is in unnamed module of loader org.enso.interpreter.runtime.HostClassLoader @7dc6887e, parent loader 'app')
once I synchronized this they all stopped and I've never seen them again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- OK, thanks for sharing the failure:
attempted duplicate class definition for org.enso.base.Text_Utils
- As far I can interpret it, it is related to two threads calling
- into
URLClassLoader.findClass
for the samename
- which then calls
defineClass
twice - and that throws the exception on the second attempt
- into
- good to know the
synchronized
block fixes the problem - it is just too wide synchronization ...
- ...using the one with Futures would be more fine grain
- but such a change can certainly wait for now - it'd be good to have it before integration however
l.getMethods(); | ||
} | ||
logger.trace("Class {} found, putting in cache", name); | ||
loadedClasses.put(name, l); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- if the concurrency problems are related to read/write to
loadClasses
- then we should eliminate them on the scope of
name
- e.g. don't
synchronize
to avoid parallel classloading, but - make sure we call
findClass(name)
only once per eachname
- I guess a future would do:
private ConcurrentHashMap<String, Future<Class<?>>> loadedClasses;
public Class<?> loadClass(String name) throws Exception {
var toResolve = new CompletableFuture<?>[1];
var f = loadedClasses.computeIfAbsent(name, (t) -> {
var newF = new CompletableFuture<Class<?>>();
toResolve[0] = newF;
return newF;
});
if (toResolve[0] != null) {
toResolve[0].complete(findClass(name));
}
return f.get();
}
- if multiple threads request the same
name
- we select one of them
- to handle the
findClass
- others wait for the future to be completed
- threads only wait if there is another one loading
name
at the same time - different values of
name
don't influence each other
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- changing signatures of
ExecutionService
to return future like objects is good - I would feel safer if we returned
CompletionStage
instead ofCompletableFuture
- it more clearly separates the API we want clients of
ExecutionService
to use - from the internal methods only for those who create
CompletableFuture
s
- it more clearly separates the API we want clients of
- please try to change the methods to return
CompletionStage
- if it doesn't work, we can live with
CompletableFuture
, but...
@@ -112,21 +113,25 @@ public EnsoContext getContext() { | |||
return context; | |||
} | |||
|
|||
public FunctionCallInstrumentationNode.FunctionCall prepareFunctionCall( | |||
public CompletableFuture<FunctionCallInstrumentationNode.FunctionCall> prepareFunctionCall( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- changing the methods of
ExecutionService
to not block is proper move- thanks for doing that
- moreover we need to return a "chainable" object - so
ExecutionService
API users can act when the computation is finished - using
CompletableFuture
for that purpose has its downsides- it mixes provider vs. user API
- as it exposes
completed
& co. methods - we certainly do not want people who obtain the
CompletableFuture
from this method to callcompleted
on it!
- can we return CompletableStage here?
CompletableStage
has only the methods that we want clients of the API to use...
@@ -237,7 +234,7 @@ public void execute( | |||
* @param onCachedCallback the consumer of the cached value events. | |||
* @param onExecutedVisualizationCallback the consumer of an executed visualization result. | |||
*/ | |||
public void execute( | |||
public CompletableFuture<Object> execute( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- the diffs are huge, but I only see this
CompletableFuture<...>
return type change - that OK
- consider adding
@return
javadoc line
return context.getThreadManager().submit(c); | ||
} | ||
|
||
private static <T> T resultOf(Future<T> future) { | ||
public static <T> T resultOf(Future<T> future) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- this method doesn't belong into
ExecutionService
API - it was fine as an internal helper method, but it is "too ugly" to be exposed in the API
- moreover it is no longer used in
ExecutionService
- please remove it from this class
@@ -209,6 +209,7 @@ object ProgramExecutionSupport { | |||
onCachedValueCallback, | |||
onExecutedVisualizationCallback | |||
) | |||
ExecutionService.resultOf(pending) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- this line would be slightly problematic with pure
completionStage
- but one can always write
stage.toCompletableFuture().get()
...
) | ||
) | ||
val visualizationResult = | ||
ExecutionService.resultOf(visualizationResultFuture) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- do we have to wait here for the result?
- can't this method return
CompletableStage<String>
? - when we have reactive
ExecutionService
- it'd be better/sane/consistent to propagate reactivity further
invalidatedExpressions.contains | ||
) | ||
} yield visualization | ||
} | ||
} | ||
|
||
/** Removes a visualization from the holder. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just tried to runProjectManagerDistribution
and open a simple project. I see exceptions like:
[WARN] [2025-06-06T11:08:31.794] [org.enso.persist] Adding at 1269246 object:
org.enso.compiler.core.ir.Name$Qualified@798b9e67
but there already is:
org.enso.compiler.core.ir.Name$Qualified@724ab5aa
are they equal: true
java.io.IOException: Adding at 1269246 object:
org.enso.compiler.core.ir.Name$Qualified@798b9e67
but there already is:
org.enso.compiler.core.ir.Name$Qualified@724ab5aa
are they equal: true
at org.enso.persistance/org.enso.persist.PerInputImpl.readIndirect(PerInputImpl.java:228)
at org.enso.persistance/org.enso.persist.PerInputImpl.readObject(PerInputImpl.java:73)
at org.enso.runtime.parser/org.enso.compiler.core.ir.IrPersistance$PersistScalaOption.readObject(IrPersistance.java:153)
at org.enso.runtime.parser/org.enso.compiler.core.ir.IrPersistance$PersistScalaOption.readObject(IrPersistance.java:140)
at org.enso.persistance/org.enso.persist.Persistance.readWith(Persistance.java:165)
at org.enso.persistance/org.enso.persist.PerInputImpl.readInline(PerInputImpl.java:66)
at org.enso.runtime.parser/org.enso.compiler.core.ir.PersistName_MethodReference.readObject(PersistName_MethodReference.java:11)
at org.enso.runtime.parser/org.enso.compiler.core.ir.PersistName_MethodReference.readObject(PersistName_MethodReference.java:4)
at org.enso.persistance/org.enso.persist.Persistance.readWith(Persistance.java:165)
at org.enso.persistance/org.enso.persist.PerInputImpl.readInline(PerInputImpl.java:66)
at org.enso.runtime.parser/org.enso.compiler.core.ir.PersistMethod_Explicit.readObject(PersistMethod_Explicit.java:11)
at org.enso.runtime.parser/org.enso.compiler.core.ir.PersistMethod_Explicit.readObject(PersistMethod_Explicit.java:4)
at org.enso.persistance/org.enso.persist.Persistance.readWith(Persistance.java:165)
at org.enso.persistance/org.enso.persist.PerInputImpl.readIndirect(PerInputImpl.java:217)
at org.enso.persistance/org.enso.persist.PerInputImpl.readObject(PerInputImpl.java:73)
at org.enso.runtime.parser/org.enso.compiler.core.ir.IrPersistance$PersistScalaList.readObject(IrPersistance.java:240)
at org.enso.runtime.parser/org.enso.compiler.core.ir.IrPersistance$PersistScalaList.readObject(IrPersistance.java:220)
at org.enso.persistance/org.enso.persist.Persistance.readWith(Persistance.java:165)
at org.enso.persistance/org.enso.persist.PerInputImpl.readInline(PerInputImpl.java:66)
at org.enso.runtime.parser/org.enso.compiler.core.ir.PersistModule.readObject(PersistModule.java:13)
at org.enso.runtime.parser/org.enso.compiler.core.ir.PersistModule.readObject(PersistModule.java:4)
at org.enso.persistance/org.enso.persist.Persistance.readWith(Persistance.java:165)
at org.enso.persistance/org.enso.persist.PerInputImpl.readInline(PerInputImpl.java:66)
at org.enso.persistance/org.enso.persist.PerBufferReference.readObject(PerBufferReference.java:48)
at org.enso.persistance/org.enso.persist.Persistance$Reference.get(Persistance.java:305)
at org.enso.runtime.parser/org.enso.compiler.core.ir.IrLazyMap$En.getValue(IrLazyMap.java:57)
at org.enso.runtime.parser/org.enso.compiler.core.ir.IrLazyMap.get(IrLazyMap.java:37)
at org.enso.runtime/org.enso.interpreter.caches.ImportExportCache$MapToBindings.findForModule(ImportExportCache.java:145)
they are harmless (as there is the are they equal: true
message and that signals everything is correct), but they clearly show IrLazyMap
hasn't really been tested/designed for parallel access.
re propagating futures downstream: |
@@ -319,6 +319,10 @@ class MainModule(serverConfig: LanguageServerConfig, logLevel: Level) { | |||
RuntimeOptions.JOB_PARALLELISM, | |||
Runtime.getRuntime.availableProcessors().toString | |||
) | |||
extraOptions.put( | |||
RuntimeOptions.GUEST_PARALLELISM, | |||
3.toString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incremental Integration of Fixes
- I am afraid we won't be able to fix all problems in a single PR
- I would still like us to move forward
- hence I suggest to make
GUEST_PARALLELISM=3
an opt-in feature - by default we would stick to the safe
GUEST_PARALLELISM=1
value - that way we can integrate and still give early adopters a chance to test the increased parallelism
Pool of One Must be Enough
- however that means we can run with
1.toString
value - right now, we cannot
- the code starvates for me - not sure why
- but there is no reason why we shouldn't be running with
GUEST_PARALLELISM=1
- I'll continue investigation of this
✨ GUI Checks ResultsSummary
See individual check results for more details. |
By default, no execution is triggered by push context request.
Evaluation of a visualization occasionally requires write compilation lock to load/compile involved modules. This leads to common lock contention situations when in the current implementation. Upgrading from a read compilation lock to a write one leads to deadlock and is not supported by Reentrant locks anyway. Requesting write compilation lock always is also not acceptable because it impacts performance. The current solution uses retries: - locations that trigger compilation in visualization evaluation are well known - we first attempt to evaluate visualizations without a compilation and read lock only - if the latter fails, we acquire write lock and retry
Most runtime visualization unit tests will now fail because of how they use cache preferences. With this PR we rely on the fact that when upserting visualizations, runtime cache has cache preferences (i.e. IDs of expressions) set correctly already. So it's a chicken and egg problem. |
Note that with current implementation I'm no longer experiencing any deadlocks or lock contention: Notice how a simple expression's visualization is loaded before reading of a large CSV file finishes. Previously that would be unimaginable as visualizations would only be triggered after the main execution is finished. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reactive as in Vue.js
- This is a comment about reactive
RuntimeCache
. - For few days I was puzzled why
rxjava
? - Just to realize that the RxJava reactivity isn't the reactivity me and @Frizi were talking about in Italy
Motivation: Table.join
- there are "visualizations" that no longer depend just on a single UUID value
- for example
Table.join
depends onself
andother
- support for that was implemented in #9410
- but the support is fragile
- what we want is
watchEffect
...- visualization may access as many UUID values from the cache as it wants
- the execution is tracked and all the accessed UUIDs are recorded
- once any of the UUIDs "observables" changes
- re-evaluation of the visualization is triggered
- the execution is tracked and all the accessed UUIDs are recorded
- and so on, so on
- these kinds of visualizations that observer many values from the cache will be more and more common
- especially if we connect Y.doc structures with the cache
Motivation: Eviction from the Cache
- when the IDE modifies a line of code that defines a variable
- it is clear the value associated with that variable needs to be removed from the cache
- and recomputed on next computation
- however not only the single value needs to be evicted
- all other that depend on it need to be evicted as well
- right now we complicate and slow our compiler down with passes like
DataflowAnalysis
& co. - instead - we should use
watchEffect
- when evaluating a component/node/assignment to a variable
- track the execution and record all UUIDs that are accessed while computing the value
- when putting the
computed
value into the runtime cache - let
watchEffect
connect it with all the observable UUIDs it read - as soon as one of them is evicted, evict also all the dependent ones
- this is the Vue.js, Knockout.js & co. client reactivity
- that would allow us to do less during static analysis
- and get better results (as observing runtime has smaller closure than static analysis)
Java Implementation
- I don't think rxjava is a suitable library for this kind of client reactivity
- there is HTML/Java API that has the right "observables" we need
|
||
val rxJavaVersion = "3.1.10" | ||
val rxJava = Seq( | ||
"io.reactivex.rxjava3" % "rxjava" % "3.1.10" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I am surprised by usage of
rxjava
- sure, it stands for Reactive Java - but it is not the reactive I was thinking about
- when talking about reactive I mean Vue.js reactive - e.g.
- reactive core
- with its
ref
,computed
,watchEffect
, etc.
- or about Svelte's reactivity as shown in the following video
import io.reactivex.rxjava3.observers.DisposableObserver; | ||
import io.reactivex.rxjava3.schedulers.Schedulers; | ||
import io.reactivex.rxjava3.subjects.ReplaySubject; | ||
import io.reactivex.rxjava3.subjects.Subject; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- once I had a discussion with Helidon guys about reactivity
- just to conclude server side reactive is different to client side one
- all those backpressures, etc... that is not really useful on the client
- there are similarities, but I am not really knowledgeable of rxjava's
Subject
,ReplaySubject
, etc. - all I can tell is that I don't see
ref
,computed
,watchEffect
or similar - thus bear with me...
Consumer<Object> onNext, | ||
Consumer<Throwable> onFailure, | ||
Executor executor) { | ||
Subject<Object> observable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- what is the role of
observable
? - what shall it observe?
- the value associated with
UUID
? - just the single value?
observables.computeIfAbsent(expressionId, (UUID uuid) -> ReplaySubject.createWithSize(1)); | ||
} | ||
var scheduler = Schedulers.from(executor); | ||
var observer = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- am I wrong to say that this is just like
CompletableFuture
that others may "chain" actions to - which is triggered when the value is changed?
- for something as simple we probably don't need
rxjava
3rd party library - there are methods like
thenApply(..., executor)
already in the JDK... - but when talking about "reactive cache" I probably wanted something completely different
When diagnostics are being gathered outside of ThreadManager, execution will blow up due to Truffle restrictions.
`ExecuteJob` needs to run whenever there was no observable present for the requested expression ID. Had to bring back cache invalidation logic to ensure that one can attach visualization to any expression. This was problematic for situations when requested expression was part of a larger one which result has already been cached.
A bug lurked in `HostClassLoader` that was manifesting itself in a hidden way (no symbol found). Now logging the exception so that it is easier to spot the problem.
Bringing back previous behaviour. Otherwise, it is very likely request handlers will start reporting timeouts - evaluation/execution of visualizations may make a while.
No need to recalculate expression every time.
When evaluating module only once, modifications to visualization modules will not be picked up, as indicated by a failing test.
Parked threads conflict with guest truffle threads that are waiting at synchronization points. To minimize chances of thread starvation we ensure that core threads are not kept alive when done.
Pull Request Description
WIP
This change adds support for executing visualizations in parallel to regular program execution.
Previously, attaching new visualizations would have to wait for the existing evaluation to finish and a visualization would be evaluated in a callback during re-execution. In the presence of expensive (long) computations the delay would be unacceptable.
Closes #10525.
Important Notes
Enabling parallel execution created a few problems, as previously execution was always done sequentially:
GUEST_PARALLELISM
HostClassLoader
TODO:
RecomputeContextRequest
is being made (i.e. all values are recomputed)Checklist
Please ensure that the following checklist has been satisfied before submitting the PR:
Scala,
Java,
TypeScript,
and
Rust
style guides. In case you are using a language not listed above, follow the Rust style guide.
or the Snowflake database integration, a run of the Extra Tests has been scheduled.