Skip to content

Commit 34cda87

Browse files
committed
[FLINK-4490] [distributed coordination] (part 1) Change InstanceConnectionInfo to TaskManagerLocation
This adds the ResourceId to the TaskManagerLocation
1 parent e227b10 commit 34cda87

File tree

31 files changed

+276
-289
lines changed

31 files changed

+276
-289
lines changed

flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,19 @@
1919
package org.apache.flink.mesos.runtime.clusterframework
2020

2121
import org.apache.flink.runtime.clusterframework.types.ResourceID
22-
import org.apache.flink.runtime.instance.InstanceConnectionInfo
2322
import org.apache.flink.runtime.io.disk.iomanager.IOManager
2423
import org.apache.flink.runtime.io.network.NetworkEnvironment
2524
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
2625
import org.apache.flink.runtime.memory.MemoryManager
27-
import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration}
26+
import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
2827

2928
/** An extension of the TaskManager that listens for additional Mesos-related
3029
* messages.
3130
*/
3231
class MesosTaskManager(
3332
config: TaskManagerConfiguration,
3433
resourceID: ResourceID,
35-
connectionInfo: InstanceConnectionInfo,
34+
taskManagerLocation: TaskManagerLocation,
3635
memoryManager: MemoryManager,
3736
ioManager: IOManager,
3837
network: NetworkEnvironment,
@@ -41,7 +40,7 @@ class MesosTaskManager(
4140
extends TaskManager(
4241
config,
4342
resourceID,
44-
connectionInfo,
43+
taskManagerLocation,
4544
memoryManager,
4645
ioManager,
4746
network,

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.fasterxml.jackson.core.JsonGenerator;
2222
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
2323
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
24-
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
24+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
2525
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
2626
import org.apache.flink.util.ExceptionUtils;
2727

@@ -66,7 +66,7 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th
6666
break;
6767
}
6868

69-
InstanceConnectionInfo location = task.getCurrentAssignedResourceLocation();
69+
TaskManagerLocation location = task.getCurrentAssignedResourceLocation();
7070
String locationString = location != null ?
7171
location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)";
7272

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.flink.runtime.execution.ExecutionState;
2727
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
2828
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
29-
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
29+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
3030
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
3131

3232
import java.io.StringWriter;
@@ -61,7 +61,7 @@ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> pa
6161
for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
6262
final ExecutionState status = vertex.getExecutionState();
6363

64-
InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
64+
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
6565
String locationString = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
6666

6767
long startTime = vertex.getStateTimestamp(ExecutionState.DEPLOYING);

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.flink.runtime.execution.ExecutionState;
2626
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
2727
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
28-
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
28+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
2929
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
3030

3131
import java.io.StringWriter;
@@ -51,7 +51,7 @@ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> pa
5151
Map<String, List<ExecutionVertex>> taskManagerVertices = new HashMap<>();
5252

5353
for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
54-
InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
54+
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
5555
String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
5656

5757
List<ExecutionVertex> vertices = taskManagerVertices.get(taskManager);

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
2626
import org.apache.flink.runtime.execution.ExecutionState;
2727
import org.apache.flink.runtime.executiongraph.Execution;
28-
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
28+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
2929
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
3030

3131
import java.io.StringWriter;
@@ -45,7 +45,7 @@ public String handleRequest(Execution execAttempt, Map<String, String> params) t
4545
final ExecutionState status = execAttempt.getState();
4646
final long now = System.currentTimeMillis();
4747

48-
InstanceConnectionInfo location = execAttempt.getAssignedResourceLocation();
48+
TaskManagerLocation location = execAttempt.getAssignedResourceLocation();
4949
String locationString = location == null ? "(unassigned)" : location.getHostname();
5050

5151
long startTime = execAttempt.getStateTimestamp(ExecutionState.DEPLOYING);

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
2424
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
2525
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
26-
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
26+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
2727
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
2828

2929
import java.io.StringWriter;
@@ -52,7 +52,7 @@ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> pa
5252
int num = 0;
5353
for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
5454

55-
InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
55+
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
5656
String locationString = location == null ? "(unassigned)" : location.getHostname();
5757

5858
gen.writeStartObject();

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.flink.runtime.execution.ExecutionState;
2424
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
2525
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
26-
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
26+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
2727
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
2828

2929
import java.io.StringWriter;
@@ -70,7 +70,7 @@ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> pa
7070
gen.writeStartObject();
7171
gen.writeNumberField("subtask", num++);
7272

73-
InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
73+
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
7474
String locationString = location == null ? "(unassigned)" : location.getHostname();
7575
gen.writeStringField("host", locationString);
7676

flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.flink.runtime.executiongraph.Execution;
2222
import org.apache.flink.runtime.executiongraph.IntermediateResult;
2323
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
24-
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
24+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
2525
import org.apache.flink.runtime.io.network.ConnectionID;
2626
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
2727
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -44,15 +44,15 @@ public class PartialInputChannelDeploymentDescriptor {
4444
private final ResultPartitionID partitionID;
4545

4646
/** The partition connection info. */
47-
private final InstanceConnectionInfo partitionConnectionInfo;
47+
private final TaskManagerLocation partitionConnectionInfo;
4848

4949
/** The partition connection index. */
5050
private final int partitionConnectionIndex;
5151

5252
public PartialInputChannelDeploymentDescriptor(
5353
IntermediateDataSetID resultId,
5454
ResultPartitionID partitionID,
55-
InstanceConnectionInfo partitionConnectionInfo,
55+
TaskManagerLocation partitionConnectionInfo,
5656
int partitionConnectionIndex) {
5757

5858
this.resultId = checkNotNull(resultId);
@@ -71,7 +71,7 @@ public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor(
7171

7272
checkNotNull(consumerExecution, "Consumer execution null");
7373

74-
InstanceConnectionInfo consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();
74+
TaskManagerLocation consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();
7575

7676
checkNotNull(consumerConnectionInfo, "Consumer connection info null");
7777

@@ -107,7 +107,7 @@ public static PartialInputChannelDeploymentDescriptor fromEdge(
107107
final IntermediateResult result = partition.getIntermediateResult();
108108

109109
final IntermediateDataSetID resultId = result.getId();
110-
final InstanceConnectionInfo partitionConnectionInfo = producer.getAssignedResourceLocation();
110+
final TaskManagerLocation partitionConnectionInfo = producer.getAssignedResourceLocation();
111111
final int partitionConnectionIndex = result.getConnectionIndex();
112112

113113
return new PartialInputChannelDeploymentDescriptor(

flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
3232
import org.apache.flink.runtime.execution.ExecutionState;
3333
import org.apache.flink.runtime.instance.Instance;
34-
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
34+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
3535
import org.apache.flink.runtime.instance.ActorGateway;
3636
import org.apache.flink.runtime.instance.SimpleSlot;
3737
import org.apache.flink.runtime.io.network.ConnectionID;
@@ -133,7 +133,7 @@ public class Execution {
133133

134134
private volatile Throwable failureCause; // once assigned, never changes
135135

136-
private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
136+
private volatile TaskManagerLocation assignedResourceLocation; // for the archived execution
137137

138138
private ChainedStateHandle<StreamStateHandle> chainedStateHandle;
139139

@@ -147,7 +147,7 @@ public class Execution {
147147

148148
/* Lock for updating the accumulators atomically. Prevents final accumulators to be overwritten
149149
* by partial accumulators on a late heartbeat*/
150-
private final SerializableObject accumulatorLock = new SerializableObject();
150+
private final Object accumulatorLock = new Object();
151151

152152
/* Continuously updated map of user-defined accumulators */
153153
private volatile Map<String, Accumulator<?, ?>> userAccumulators;
@@ -202,7 +202,7 @@ public SimpleSlot getAssignedResource() {
202202
return assignedResource;
203203
}
204204

205-
public InstanceConnectionInfo getAssignedResourceLocation() {
205+
public TaskManagerLocation getAssignedResourceLocation() {
206206
return assignedResourceLocation;
207207
}
208208

flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
2929
import org.apache.flink.runtime.execution.ExecutionState;
3030
import org.apache.flink.runtime.instance.Instance;
31-
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
31+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
3232
import org.apache.flink.runtime.instance.ActorGateway;
3333
import org.apache.flink.runtime.instance.SimpleSlot;
3434
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -228,7 +228,7 @@ public SimpleSlot getCurrentAssignedResource() {
228228
return currentExecution.getAssignedResource();
229229
}
230230

231-
public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
231+
public TaskManagerLocation getCurrentAssignedResourceLocation() {
232232
return currentExecution.getAssignedResourceLocation();
233233
}
234234

0 commit comments

Comments
 (0)