Skip to content

Commit 635c869

Browse files
committed
[FLINK-4457] Make ExecutionGraph independent of actors.
This introduced types JobStatusListener and ExecutionStatusListener interfaces that replace the ActorRefs and ActorGateway for listeners
1 parent 4e9d177 commit 635c869

File tree

11 files changed

+360
-161
lines changed

11 files changed

+360
-161
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818

1919
package org.apache.flink.runtime.checkpoint;
2020

21-
import akka.actor.ActorSystem;
22-
import akka.actor.PoisonPill;
23-
import akka.actor.Props;
2421
import akka.dispatch.Futures;
2522

2623
import org.apache.flink.api.common.JobID;
@@ -31,8 +28,7 @@
3128
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
3229
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
3330
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
34-
import org.apache.flink.runtime.instance.ActorGateway;
35-
import org.apache.flink.runtime.instance.AkkaActorGateway;
31+
import org.apache.flink.runtime.executiongraph.JobStatusListener;
3632
import org.apache.flink.runtime.jobgraph.JobVertexID;
3733
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
3834
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
@@ -57,7 +53,6 @@
5753
import java.util.Set;
5854
import java.util.Timer;
5955
import java.util.TimerTask;
60-
import java.util.UUID;
6156

6257
import static org.apache.flink.util.Preconditions.checkArgument;
6358
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -137,7 +132,7 @@ public class CheckpointCoordinator {
137132
private final Timer timer;
138133

139134
/** Actor that receives status updates from the execution graph this coordinator works for */
140-
private ActorGateway jobStatusListener;
135+
private JobStatusListener jobStatusListener;
141136

142137
/** The number of consecutive failed trigger attempts */
143138
private int numUnsuccessfulCheckpointsTriggers;
@@ -266,12 +261,6 @@ private void shutdown(boolean shutdownStoreAndCounter) throws Exception {
266261
// shut down the thread that handles the timeouts and pending triggers
267262
timer.cancel();
268263

269-
// make sure that the actor does not linger
270-
if (jobStatusListener != null) {
271-
jobStatusListener.tell(PoisonPill.getInstance());
272-
jobStatusListener = null;
273-
}
274-
275264
// clear and discard all pending checkpoints
276265
for (PendingCheckpoint pending : pendingCheckpoints.values()) {
277266
pending.abortError(new Exception("Checkpoint Coordinator is shutting down"));
@@ -903,7 +892,7 @@ public CheckpointIDCounter getCheckpointIdCounter() {
903892
// Periodic scheduling of checkpoints
904893
// --------------------------------------------------------------------------------------------
905894

906-
public void startCheckpointScheduler() throws Exception {
895+
public void startCheckpointScheduler() {
907896
synchronized (lock) {
908897
if (shutdown) {
909898
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
@@ -918,7 +907,7 @@ public void startCheckpointScheduler() throws Exception {
918907
}
919908
}
920909

921-
public void stopCheckpointScheduler() throws Exception {
910+
public void stopCheckpointScheduler() {
922911
synchronized (lock) {
923912
triggerRequestQueued = false;
924913
periodicScheduling = false;
@@ -929,10 +918,14 @@ public void stopCheckpointScheduler() throws Exception {
929918
}
930919

931920
for (PendingCheckpoint p : pendingCheckpoints.values()) {
932-
p.abortError(new Exception("Checkpoint Coordinator is suspending."));
921+
try {
922+
p.abortError(new Exception("Checkpoint Coordinator is suspending."));
923+
} catch (Throwable t) {
924+
LOG.error("Error while disposing pending checkpoint", t);
925+
}
933926
}
934-
pendingCheckpoints.clear();
935927

928+
pendingCheckpoints.clear();
936929
numUnsuccessfulCheckpointsTriggers = 0;
937930
}
938931
}
@@ -941,17 +934,14 @@ public void stopCheckpointScheduler() throws Exception {
941934
// job status listener that schedules / cancels periodic checkpoints
942935
// ------------------------------------------------------------------------
943936

944-
public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
937+
public JobStatusListener createActivatorDeactivator() {
945938
synchronized (lock) {
946939
if (shutdown) {
947940
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
948941
}
949942

950943
if (jobStatusListener == null) {
951-
Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID);
952-
953-
// wrap the ActorRef in a AkkaActorGateway to support message decoration
954-
jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
944+
jobStatusListener = new CheckpointCoordinatorDeActivator(this);
955945
}
956946

957947
return jobStatusListener;

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,51 +18,32 @@
1818

1919
package org.apache.flink.runtime.checkpoint;
2020

21-
import org.apache.flink.runtime.akka.FlinkUntypedActor;
21+
import org.apache.flink.api.common.JobID;
22+
import org.apache.flink.runtime.executiongraph.JobStatusListener;
2223
import org.apache.flink.runtime.jobgraph.JobStatus;
23-
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
24-
import org.apache.flink.util.Preconditions;
2524

26-
import java.util.UUID;
25+
import static org.apache.flink.util.Preconditions.checkNotNull;
2726

2827
/**
2928
* This actor listens to changes in the JobStatus and activates or deactivates the periodic
3029
* checkpoint scheduler.
3130
*/
32-
public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
31+
public class CheckpointCoordinatorDeActivator implements JobStatusListener {
3332

3433
private final CheckpointCoordinator coordinator;
35-
private final UUID leaderSessionID;
36-
37-
public CheckpointCoordinatorDeActivator(
38-
CheckpointCoordinator coordinator,
39-
UUID leaderSessionID) {
4034

41-
LOG.info("Create CheckpointCoordinatorDeActivator");
42-
43-
this.coordinator = Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null.");
44-
this.leaderSessionID = leaderSessionID;
35+
public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
36+
this.coordinator = checkNotNull(coordinator);
4537
}
4638

4739
@Override
48-
public void handleMessage(Object message) throws Exception {
49-
if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
50-
JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();
51-
52-
if (status == JobStatus.RUNNING) {
53-
// start the checkpoint scheduler
54-
coordinator.startCheckpointScheduler();
55-
} else {
56-
// anything else should stop the trigger for now
57-
coordinator.stopCheckpointScheduler();
58-
}
40+
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
41+
if (newJobStatus == JobStatus.RUNNING) {
42+
// start the checkpoint scheduler
43+
coordinator.startCheckpointScheduler();
44+
} else {
45+
// anything else should stop the trigger for now
46+
coordinator.stopCheckpointScheduler();
5947
}
60-
61-
// we ignore all other messages
62-
}
63-
64-
@Override
65-
public UUID getLeaderSessionID() {
66-
return leaderSessionID;
6748
}
6849
}

flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java

Lines changed: 42 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.flink.runtime.executiongraph;
2020

21-
import akka.actor.ActorSystem;
2221
import org.apache.flink.api.common.ExecutionConfig;
2322
import org.apache.flink.api.common.JobID;
2423
import org.apache.flink.api.common.accumulators.Accumulator;
@@ -41,7 +40,6 @@
4140
import org.apache.flink.runtime.execution.ExecutionState;
4241
import org.apache.flink.runtime.execution.SuppressRestartsException;
4342
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
44-
import org.apache.flink.runtime.instance.ActorGateway;
4543
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
4644
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
4745
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -50,15 +48,16 @@
5048
import org.apache.flink.runtime.jobgraph.ScheduleMode;
5149
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
5250
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
53-
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
5451
import org.apache.flink.runtime.query.KvStateLocationRegistry;
5552
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
5653
import org.apache.flink.runtime.util.SerializableObject;
5754
import org.apache.flink.runtime.util.SerializedThrowable;
5855
import org.apache.flink.util.ExceptionUtils;
5956
import org.apache.flink.util.SerializedValue;
57+
6058
import org.slf4j.Logger;
6159
import org.slf4j.LoggerFactory;
60+
6261
import scala.concurrent.ExecutionContext;
6362
import scala.concurrent.duration.FiniteDuration;
6463

@@ -75,12 +74,12 @@
7574
import java.util.Map;
7675
import java.util.NoSuchElementException;
7776
import java.util.Objects;
78-
import java.util.UUID;
7977
import java.util.concurrent.ConcurrentHashMap;
8078
import java.util.concurrent.CopyOnWriteArrayList;
8179
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
8280

8381
import static org.apache.flink.util.Preconditions.checkNotNull;
82+
8483
/**
8584
* The execution graph is the central data structure that coordinates the distributed
8685
* execution of a data flow. It keeps representations of each parallel task, each
@@ -151,12 +150,12 @@ public class ExecutionGraph {
151150
* accessible on all nodes in the cluster. */
152151
private final List<URL> requiredClasspaths;
153152

154-
/** Listeners that receive messages when the entire job switches it status (such as from
155-
* RUNNING to FINISHED) */
156-
private final List<ActorGateway> jobStatusListenerActors;
153+
/** Listeners that receive messages when the entire job switches it status
154+
* (such as from RUNNING to FINISHED) */
155+
private final List<JobStatusListener> jobStatusListeners;
157156

158157
/** Listeners that receive messages whenever a single task execution changes its status */
159-
private final List<ActorGateway> executionListenerActors;
158+
private final List<ExecutionStatusListener> executionListeners;
160159

161160
/** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
162161
* the execution graph transitioned into a certain state. The index into this array is the
@@ -284,8 +283,8 @@ public ExecutionGraph(
284283
this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
285284
this.currentExecutions = new ConcurrentHashMap<ExecutionAttemptID, Execution>();
286285

287-
this.jobStatusListenerActors = new CopyOnWriteArrayList<ActorGateway>();
288-
this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>();
286+
this.jobStatusListeners = new CopyOnWriteArrayList<>();
287+
this.executionListeners = new CopyOnWriteArrayList<>();
289288

290289
this.stateTimestamps = new long[JobStatus.values().length];
291290
this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
@@ -345,8 +344,6 @@ public void enableSnapshotCheckpointing(
345344
List<ExecutionJobVertex> verticesToTrigger,
346345
List<ExecutionJobVertex> verticesToWaitFor,
347346
List<ExecutionJobVertex> verticesToCommitTo,
348-
ActorSystem actorSystem,
349-
UUID leaderSessionID,
350347
CheckpointIDCounter checkpointIDCounter,
351348
CompletedCheckpointStore checkpointStore,
352349
SavepointStore savepointStore,
@@ -388,8 +385,7 @@ public void enableSnapshotCheckpointing(
388385

389386
// the periodic checkpoint scheduler is activated and deactivated as a result of
390387
// job status changes (running -> on, all other states -> off)
391-
registerJobStatusListener(
392-
checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
388+
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
393389
}
394390

395391
/**
@@ -935,8 +931,8 @@ public void prepareForArchiving() {
935931
intermediateResults.clear();
936932
currentExecutions.clear();
937933
requiredJarFiles.clear();
938-
jobStatusListenerActors.clear();
939-
executionListenerActors.clear();
934+
jobStatusListeners.clear();
935+
executionListeners.clear();
940936

941937
isArchived = true;
942938
}
@@ -1173,45 +1169,52 @@ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
11731169
// Listeners & Observers
11741170
// --------------------------------------------------------------------------------------------
11751171

1176-
public void registerJobStatusListener(ActorGateway listener) {
1172+
public void registerJobStatusListener(JobStatusListener listener) {
11771173
if (listener != null) {
1178-
this.jobStatusListenerActors.add(listener);
1174+
jobStatusListeners.add(listener);
11791175
}
11801176
}
11811177

1182-
public void registerExecutionListener(ActorGateway listener) {
1178+
public void registerExecutionListener(ExecutionStatusListener listener) {
11831179
if (listener != null) {
1184-
this.executionListenerActors.add(listener);
1180+
executionListeners.add(listener);
11851181
}
11861182
}
11871183

11881184
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
1189-
if (jobStatusListenerActors.size() > 0) {
1190-
ExecutionGraphMessages.JobStatusChanged message =
1191-
new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(),
1192-
error == null ? null : new SerializedThrowable(error));
1193-
1194-
for (ActorGateway listener: jobStatusListenerActors) {
1195-
listener.tell(message);
1185+
if (jobStatusListeners.size() > 0) {
1186+
final long timestamp = System.currentTimeMillis();
1187+
final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
1188+
1189+
for (JobStatusListener listener : jobStatusListeners) {
1190+
try {
1191+
listener.jobStatusChanges(jobID, newState, timestamp, serializedError);
1192+
} catch (Throwable t) {
1193+
LOG.warn("Error while notifying JobStatusListener", t);
1194+
}
11961195
}
11971196
}
11981197
}
11991198

1200-
void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
1201-
newExecutionState, Throwable error)
1199+
void notifyExecutionChange(
1200+
JobVertexID vertexId, int subtask, ExecutionAttemptID executionID,
1201+
ExecutionState newExecutionState, Throwable error)
12021202
{
12031203
ExecutionJobVertex vertex = getJobVertex(vertexId);
12041204

1205-
if (executionListenerActors.size() > 0) {
1206-
String message = error == null ? null : ExceptionUtils.stringifyException(error);
1207-
ExecutionGraphMessages.ExecutionStateChanged actorMessage =
1208-
new ExecutionGraphMessages.ExecutionStateChanged(jobID, vertexId, vertex.getJobVertex().getName(),
1209-
vertex.getParallelism(), subtask,
1210-
executionID, newExecutionState,
1211-
System.currentTimeMillis(), message);
1212-
1213-
for (ActorGateway listener : executionListenerActors) {
1214-
listener.tell(actorMessage);
1205+
if (executionListeners.size() > 0) {
1206+
final String message = error == null ? null : ExceptionUtils.stringifyException(error);
1207+
final long timestamp = System.currentTimeMillis();
1208+
1209+
for (ExecutionStatusListener listener : executionListeners) {
1210+
try {
1211+
listener.executionStatusChanged(
1212+
jobID, vertexId, vertex.getJobVertex().getName(),
1213+
vertex.getParallelism(), subtask, executionID, newExecutionState,
1214+
timestamp, message);
1215+
} catch (Throwable t) {
1216+
LOG.warn("Error while notifying ExecutionStatusListener", t);
1217+
}
12151218
}
12161219
}
12171220

0 commit comments

Comments
 (0)