Skip to content

Commit 4a100fa

Browse files
kl0ualjoscha
authored andcommitted
[hotfix] Fixes unstable ContinuousFileMonitoringTest.
This closes apache#2446
1 parent bdf9f86 commit 4a100fa

File tree

1 file changed

+36
-24
lines changed

1 file changed

+36
-24
lines changed

flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -237,26 +237,36 @@ public void testFilePathFiltering() throws Exception {
237237

238238
@Test
239239
public void testFileSplitMonitoringReprocessWithAppended() throws Exception {
240-
Set<String> uniqFilesFound = new HashSet<>();
240+
final Set<String> uniqFilesFound = new HashSet<>();
241241

242242
FileCreator fc = new FileCreator(INTERVAL, NO_OF_FILES);
243243
fc.start();
244244

245-
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
246-
format.setFilesFilter(FilePathFilter.createDefaultFilter());
247-
ContinuousFileMonitoringFunction<String> monitoringFunction =
248-
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
249-
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
250-
251-
monitoringFunction.open(new Configuration());
252-
monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
245+
Thread t = new Thread(new Runnable() {
246+
@Override
247+
public void run() {
248+
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
249+
format.setFilesFilter(FilePathFilter.createDefaultFilter());
250+
ContinuousFileMonitoringFunction<String> monitoringFunction =
251+
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
252+
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
253+
254+
try {
255+
monitoringFunction.open(new Configuration());
256+
monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
257+
} catch (Exception e) {
258+
// do nothing as we interrupted the thread.
259+
}
260+
}
261+
});
262+
t.start();
253263

254264
// wait until the sink also sees all the splits.
255265
synchronized (uniqFilesFound) {
256-
while (uniqFilesFound.size() < NO_OF_FILES) {
257-
uniqFilesFound.wait(7 * INTERVAL);
258-
}
266+
uniqFilesFound.wait();
259267
}
268+
t.interrupt();
269+
fc.join();
260270

261271
Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
262272
Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
@@ -281,13 +291,15 @@ public void testFileSplitMonitoringProcessOnce() throws Exception {
281291
Set<String> uniqFilesFound = new HashSet<>();
282292

283293
FileCreator fc = new FileCreator(INTERVAL, 1);
294+
Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
284295
fc.start();
285296

286297
// to make sure that at least one file is created
287-
Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
288-
synchronized (filesCreated) {
289-
if (filesCreated.size() == 0) {
290-
filesCreated.wait();
298+
if (filesCreated.size() == 0) {
299+
synchronized (filesCreated) {
300+
if (filesCreated.size() == 0) {
301+
filesCreated.wait();
302+
}
291303
}
292304
}
293305
Assert.assertTrue(fc.getFilesCreated().size() >= 1);
@@ -391,17 +403,17 @@ public void collect(FileInputSplit element) {
391403
Assert.fail("Duplicate file: " + filePath);
392404
}
393405

394-
filesFound.add(filePath);
395-
try {
396-
if (filesFound.size() == NO_OF_FILES) {
397-
this.src.cancel();
398-
this.src.close();
399-
synchronized (filesFound) {
406+
synchronized (filesFound) {
407+
filesFound.add(filePath);
408+
try {
409+
if (filesFound.size() == NO_OF_FILES) {
410+
this.src.cancel();
411+
this.src.close();
400412
filesFound.notifyAll();
401413
}
414+
} catch (Exception e) {
415+
e.printStackTrace();
402416
}
403-
} catch (Exception e) {
404-
e.printStackTrace();
405417
}
406418
}
407419

0 commit comments

Comments
 (0)