Skip to content

Commit eac6088

Browse files
committed
[FLINK-4490] [distributed coordination] (part 3) Rename methods on 'Instance' to have more intuitive names
getResourceID() --> getTaskManagerID() getInstanceConnectionInfo() --> getTaskManagerLocation()
1 parent aaa474a commit eac6088

File tree

19 files changed

+319
-324
lines changed

19 files changed

+319
-324
lines changed

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public String handleRequest(Map<String, String> pathParams, Map<String, String>
8585
gen.writeStartObject();
8686
gen.writeStringField("id", instance.getId().toString());
8787
gen.writeStringField("path", instance.getActorGateway().path());
88-
gen.writeNumberField("dataPort", instance.getInstanceConnectionInfo().dataPort());
88+
gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
8989
gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
9090
gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
9191
gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());

flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,20 @@ public class PartialInputChannelDeploymentDescriptor {
4444
private final ResultPartitionID partitionID;
4545

4646
/** The partition connection info. */
47-
private final TaskManagerLocation partitionConnectionInfo;
47+
private final TaskManagerLocation partitionTaskManagerLocation;
4848

4949
/** The partition connection index. */
5050
private final int partitionConnectionIndex;
5151

5252
public PartialInputChannelDeploymentDescriptor(
5353
IntermediateDataSetID resultId,
5454
ResultPartitionID partitionID,
55-
TaskManagerLocation partitionConnectionInfo,
55+
TaskManagerLocation partitionTaskManagerLocation,
5656
int partitionConnectionIndex) {
5757

5858
this.resultId = checkNotNull(resultId);
5959
this.partitionID = checkNotNull(partitionID);
60-
this.partitionConnectionInfo = checkNotNull(partitionConnectionInfo);
60+
this.partitionTaskManagerLocation = checkNotNull(partitionTaskManagerLocation);
6161
this.partitionConnectionIndex = partitionConnectionIndex;
6262
}
6363

@@ -66,23 +66,20 @@ public PartialInputChannelDeploymentDescriptor(
6666
*
6767
* @see InputChannelDeploymentDescriptor
6868
*/
69-
public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor(
70-
Execution consumerExecution) {
69+
public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor(Execution consumerExecution) {
70+
checkNotNull(consumerExecution, "consumerExecution");
7171

72-
checkNotNull(consumerExecution, "Consumer execution null");
73-
74-
TaskManagerLocation consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();
75-
76-
checkNotNull(consumerConnectionInfo, "Consumer connection info null");
72+
TaskManagerLocation consumerLocation = consumerExecution.getAssignedResourceLocation();
73+
checkNotNull(consumerLocation, "Consumer connection info null");
7774

7875
final ResultPartitionLocation partitionLocation;
7976

80-
if (consumerConnectionInfo.equals(partitionConnectionInfo)) {
77+
if (consumerLocation.equals(partitionTaskManagerLocation)) {
8178
partitionLocation = ResultPartitionLocation.createLocal();
8279
}
8380
else {
8481
partitionLocation = ResultPartitionLocation.createRemote(
85-
new ConnectionID(partitionConnectionInfo, partitionConnectionIndex));
82+
new ConnectionID(partitionTaskManagerLocation, partitionConnectionIndex));
8683
}
8784

8885
return new InputChannelDeploymentDescriptor(partitionID, partitionLocation);

flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,11 @@ public class Instance implements SlotOwner {
5252
private final ActorGateway actorGateway;
5353

5454
/** The instance connection information for the data transfer. */
55-
private final TaskManagerLocation connectionInfo;
55+
private final TaskManagerLocation location;
5656

5757
/** A description of the resources of the task manager */
5858
private final HardwareDescription resources;
5959

60-
/** The ID identifies the resource the task manager runs on */
61-
private final ResourceID resourceId;
62-
6360
/** The ID identifying the taskManager. */
6461
private final InstanceID instanceId;
6562

@@ -90,22 +87,19 @@ public class Instance implements SlotOwner {
9087
* Constructs an instance reflecting a registered TaskManager.
9188
*
9289
* @param actorGateway The actor gateway to communicate with the remote instance
93-
* @param connectionInfo The remote connection where the task manager receives requests.
94-
* @param resourceId The resource id which denotes the resource the task manager uses.
90+
* @param location The remote connection where the task manager receives requests.
9591
* @param id The id under which the taskManager is registered.
9692
* @param resources The resources available on the machine.
9793
* @param numberOfSlots The number of task slots offered by this taskManager.
9894
*/
9995
public Instance(
10096
ActorGateway actorGateway,
101-
TaskManagerLocation connectionInfo,
102-
ResourceID resourceId,
97+
TaskManagerLocation location,
10398
InstanceID id,
10499
HardwareDescription resources,
105100
int numberOfSlots) {
106101
this.actorGateway = actorGateway;
107-
this.connectionInfo = connectionInfo;
108-
this.resourceId = resourceId;
102+
this.location = location;
109103
this.instanceId = id;
110104
this.resources = resources;
111105
this.numberOfSlots = numberOfSlots;
@@ -120,8 +114,8 @@ public Instance(
120114
// Properties
121115
// --------------------------------------------------------------------------------------------
122116

123-
public ResourceID getResourceId() {
124-
return resourceId;
117+
public ResourceID getTaskManagerID() {
118+
return location.getResourceID();
125119
}
126120

127121
public InstanceID getId() {
@@ -246,7 +240,7 @@ public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {
246240
return null;
247241
}
248242
else {
249-
SimpleSlot slot = new SimpleSlot(jobID, this, connectionInfo, nextSlot, actorGateway);
243+
SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, actorGateway);
250244
allocatedSlots.add(slot);
251245
return slot;
252246
}
@@ -284,7 +278,7 @@ public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sha
284278
}
285279
else {
286280
SharedSlot slot = new SharedSlot(
287-
jobID, this, connectionInfo, nextSlot, actorGateway, sharingGroupAssignment);
281+
jobID, this, location, nextSlot, actorGateway, sharingGroupAssignment);
288282
allocatedSlots.add(slot);
289283
return slot;
290284
}
@@ -355,8 +349,8 @@ public ActorGateway getActorGateway() {
355349
return actorGateway;
356350
}
357351

358-
public TaskManagerLocation getInstanceConnectionInfo() {
359-
return connectionInfo;
352+
public TaskManagerLocation getTaskManagerLocation() {
353+
return location;
360354
}
361355

362356
public int getNumberOfAvailableSlots() {
@@ -405,7 +399,7 @@ public void removeSlotListener() {
405399

406400
@Override
407401
public String toString() {
408-
return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(),
402+
return String.format("%s @ %s - %d slots - URL: %s", instanceId, location.getHostname(),
409403
numberOfSlots, (actorGateway != null ? actorGateway.path() : "No instance gateway"));
410404
}
411405
}

flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,21 +138,20 @@ public boolean reportHeartBeat(InstanceID instanceId, byte[] lastMetricsReport)
138138
* for the job execution.
139139
*
140140
* @param taskManager ActorRef to the TaskManager which wants to be registered
141-
* @param resourceID The resource id of the TaskManager
142-
* @param connectionInfo ConnectionInfo of the TaskManager
141+
* @param taskManagerLocation Location info of the TaskManager
143142
* @param resources Hardware description of the TaskManager
144143
* @param numberOfSlots Number of available slots on the TaskManager
145144
* @param leaderSessionID The current leader session ID of the JobManager
146145
* @return The assigned InstanceID of the registered task manager
147146
*/
148147
public InstanceID registerTaskManager(
149148
ActorRef taskManager,
150-
ResourceID resourceID,
151-
TaskManagerLocation connectionInfo,
149+
TaskManagerLocation taskManagerLocation,
152150
HardwareDescription resources,
153151
int numberOfSlots,
154-
UUID leaderSessionID){
155-
synchronized(this.lock){
152+
UUID leaderSessionID) {
153+
154+
synchronized (this.lock) {
156155
if (this.isShutdown) {
157156
throw new IllegalStateException("InstanceManager is shut down.");
158157
}
@@ -174,20 +173,19 @@ public InstanceID registerTaskManager(
174173

175174
InstanceID instanceID = new InstanceID();
176175

177-
Instance host = new Instance(actorGateway, connectionInfo, resourceID, instanceID,
178-
resources, numberOfSlots);
176+
Instance host = new Instance(actorGateway, taskManagerLocation, instanceID, resources, numberOfSlots);
179177

180178
registeredHostsById.put(instanceID, host);
181179
registeredHostsByConnection.put(taskManager, host);
182-
registeredHostsByResource.put(resourceID, host);
180+
registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);
183181

184182
totalNumberOfAliveTaskSlots += numberOfSlots;
185183

186184
if (LOG.isInfoEnabled()) {
187185
LOG.info(String.format("Registered TaskManager at %s (%s) as %s. " +
188186
"Current number of registered hosts is %d. " +
189187
"Current number of alive task slots is %d.",
190-
connectionInfo.getHostname(),
188+
taskManagerLocation.getHostname(),
191189
taskManager.path(),
192190
instanceID,
193191
registeredHostsById.size(),
@@ -217,7 +215,7 @@ public void unregisterTaskManager(ActorRef instanceID, boolean terminated){
217215

218216
registeredHostsByConnection.remove(host);
219217
registeredHostsById.remove(instance.getId());
220-
registeredHostsByResource.remove(instance.getResourceId());
218+
registeredHostsByResource.remove(instance.getTaskManagerID());
221219

222220
if (terminated) {
223221
deadHosts.add(instance.getActorGateway().actor());

flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,
363363

364364
// if the instance has further available slots, re-add it to the set of available resources.
365365
if (instanceToUse.hasResourcesAvailable()) {
366-
this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), instanceToUse);
366+
this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
367367
}
368368

369369
if (slot != null) {
@@ -425,7 +425,7 @@ protected SimpleSlot getNewSlotForSharingGroup(ExecutionVertex vertex,
425425

426426
// if the instance has further available slots, re-add it to the set of available resources.
427427
if (instanceToUse.hasResourcesAvailable()) {
428-
this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), instanceToUse);
428+
this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
429429
}
430430

431431
if (sharedSlot != null) {
@@ -469,7 +469,7 @@ private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requ
469469
while (this.newlyAvailableInstances.size() > 0) {
470470
Instance queuedInstance = this.newlyAvailableInstances.poll();
471471
if (queuedInstance != null) {
472-
this.instancesWithAvailableResources.put(queuedInstance.getResourceId(), queuedInstance);
472+
this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance);
473473
}
474474
}
475475

@@ -583,7 +583,7 @@ private void handleNewSlot() {
583583
}
584584
}
585585
else {
586-
this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
586+
this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
587587
}
588588
}
589589
}
@@ -649,7 +649,7 @@ public void newInstanceAvailable(Instance instance) {
649649
instance.setSlotAvailabilityListener(this);
650650

651651
// store the instance in the by-host-lookup
652-
String instanceHostName = instance.getInstanceConnectionInfo().getHostname();
652+
String instanceHostName = instance.getTaskManagerLocation().getHostname();
653653
Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
654654
if (instanceSet == null) {
655655
instanceSet = new HashSet<Instance>();
@@ -658,7 +658,7 @@ public void newInstanceAvailable(Instance instance) {
658658
instanceSet.add(instance);
659659

660660
// add it to the available resources and let potential waiters know
661-
this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
661+
this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
662662

663663
// add all slots as available
664664
for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
@@ -693,9 +693,9 @@ private void removeInstance(Instance instance) {
693693
}
694694

695695
allInstances.remove(instance);
696-
instancesWithAvailableResources.remove(instance.getResourceId());
696+
instancesWithAvailableResources.remove(instance.getTaskManagerID());
697697

698-
String instanceHostName = instance.getInstanceConnectionInfo().getHostname();
698+
String instanceHostName = instance.getTaskManagerLocation().getHostname();
699699
Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
700700
if (instanceSet != null) {
701701
instanceSet.remove(instance);
@@ -795,7 +795,7 @@ private void processNewlyAvailableInstances() {
795795

796796
while ((instance = newlyAvailableInstances.poll()) != null) {
797797
if (instance.hasResourcesAvailable()) {
798-
instancesWithAvailableResources.put(instance.getResourceId(), instance);
798+
instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
799799
}
800800
}
801801
}

flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ class JobManager(
349349
currentResourceManager = Option(msg.resourceManager())
350350

351351
val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map(
352-
instance => instance.getResourceId).toList.asJava
352+
instance => instance.getTaskManagerID).toList.asJava
353353

354354
// confirm registration and send known task managers with their resource ids
355355
sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources))
@@ -425,7 +425,6 @@ class JobManager(
425425
try {
426426
val instanceID = instanceManager.registerTaskManager(
427427
taskManager,
428-
resourceId,
429428
connectionInfo,
430429
hardwareInformation,
431430
numberOfSlots,

0 commit comments

Comments
 (0)