Skip to content

Commit 04f8ef7

Browse files
committed
Ported the JobManagerITCase to actor implementation.
1 parent b313eeb commit 04f8ef7

File tree

33 files changed

+1507
-541
lines changed

33 files changed

+1507
-541
lines changed

flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.io.File;
2222
import java.net.InetSocketAddress;
2323

24-
import org.apache.flink.client.minicluster.NepheleMiniCluster;
24+
import org.apache.flink.runtime.minicluster.NepheleMiniCluster;
2525
import org.apache.flink.client.program.Client;
2626
import org.apache.flink.client.program.PackagedProgram;
2727
import org.apache.flink.configuration.Configuration;

flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import java.net.InetSocketAddress;
2121

22-
import org.apache.flink.client.minicluster.NepheleMiniCluster;
22+
import org.apache.flink.runtime.minicluster.NepheleMiniCluster;
2323
import org.apache.flink.client.program.Client;
2424
import org.apache.flink.client.program.ProgramInvocationException;
2525
import org.apache.flink.configuration.Configuration;

flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ protected int list(String[] args) {
509509
}
510510

511511
List<RecentJobEvent> recentJobs = AkkaUtils.<EventCollectorMessages.RecentJobs>ask(jobManager,
512-
EventCollectorMessages.RequestRecentJobs$.MODULE$).asJavaList();
512+
EventCollectorMessages.RequestRecentJobEvents$.MODULE$).asJavaList();
513513

514514
ArrayList<RecentJobEvent> runningJobs = null;
515515
ArrayList<RecentJobEvent> scheduledJobs = null;

flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
import org.apache.flink.api.common.Plan;
2727
import org.apache.flink.api.common.PlanExecutor;
2828
import org.apache.flink.api.common.Program;
29-
import org.apache.flink.client.minicluster.FlinkMiniCluster;
29+
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
3031
import org.apache.flink.runtime.client.JobClient;
3132
import org.apache.flink.runtime.jobgraph.JobGraph;
3233
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -37,6 +38,7 @@
3738
import org.apache.flink.compiler.plan.OptimizedPlan;
3839
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
3940
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
41+
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
4042

4143
/**
4244
* A class for executing a {@link Plan} on a local embedded Flink runtime instance.
@@ -49,7 +51,7 @@ public class LocalExecutor extends PlanExecutor {
4951

5052
private final Object lock = new Object(); // we lock to ensure singleton execution
5153

52-
private FlinkMiniCluster nephele;
54+
private LocalFlinkMiniCluster flink;
5355

5456
// ---------------------------------- config options ------------------------------------------
5557

@@ -141,33 +143,14 @@ public void setDefaultAlwaysCreateDirectory(boolean defaultAlwaysCreateDirectory
141143

142144
public void start() throws Exception {
143145
synchronized (this.lock) {
144-
if (this.nephele == null) {
146+
if (this.flink == null) {
145147

146148
// create the embedded runtime
147-
this.nephele = new FlinkMiniCluster();
148-
149-
// configure it, if values were changed. otherwise the embedded runtime uses the internal defaults
150-
if (jobManagerRpcPort > 0) {
151-
nephele.setJobManagerRpcPort(jobManagerRpcPort);
152-
}
153-
if (taskManagerRpcPort > 0) {
154-
nephele.setTaskManagerRpcPort(jobManagerRpcPort);
155-
}
156-
if (taskManagerDataPort > 0) {
157-
nephele.setTaskManagerDataPort(taskManagerDataPort);
158-
}
159-
if (configDir != null) {
160-
nephele.setConfigDir(configDir);
161-
}
162-
if (hdfsConfigFile != null) {
163-
nephele.setHdfsConfigFile(hdfsConfigFile);
164-
}
165-
nephele.setDefaultOverwriteFiles(defaultOverwriteFiles);
166-
nephele.setDefaultAlwaysCreateDirectory(defaultAlwaysCreateDirectory);
167-
nephele.setTaskManagerNumSlots(taskManagerNumSlots);
149+
this.flink = new LocalFlinkMiniCluster(configDir);
150+
Configuration configuration = new Configuration();
168151

169152
// start it up
170-
this.nephele.start();
153+
this.flink.start(configuration);
171154
} else {
172155
throw new IllegalStateException("The local executor was already started.");
173156
}
@@ -179,9 +162,9 @@ public void start() throws Exception {
179162
*/
180163
public void stop() throws Exception {
181164
synchronized (this.lock) {
182-
if (this.nephele != null) {
183-
this.nephele.stop();
184-
this.nephele = null;
165+
if (this.flink != null) {
166+
this.flink.stop();
167+
this.flink = null;
185168
} else {
186169
throw new IllegalStateException("The local executor was not started.");
187170
}
@@ -210,7 +193,7 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
210193

211194
// check if we start a session dedicated for this execution
212195
final boolean shutDownAtEnd;
213-
if (this.nephele == null) {
196+
if (this.flink == null) {
214197
// we start a session just for us now
215198
shutDownAtEnd = true;
216199

@@ -235,7 +218,7 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
235218
NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
236219
JobGraph jobGraph = jgg.compileJobGraph(op);
237220

238-
JobClient jobClient = this.nephele.getJobClient(jobGraph);
221+
JobClient jobClient = this.flink.getJobClient(jobGraph);
239222
JobExecutionResult result = jobClient.submitJobAndWait();
240223
return result;
241224
}

0 commit comments

Comments
 (0)