Skip to content

Commit 73d61ea

Browse files
committed
[HUDI-9041] Send commit ack event when reusing current instant
1 parent e89d9cb commit 73d61ea

File tree

13 files changed

+293
-1
lines changed

13 files changed

+293
-1
lines changed

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,9 @@ private void handleBootstrapEvent(WriteMetadataEvent event) {
445445
LOG.warn("Reuse current pending Instant {} with {} operationType, "
446446
+ "ignoring empty bootstrap event.", this.instant, WriteOperationType.INSERT.value());
447447
reset();
448+
449+
// send commit act event to unblock write tasks
450+
sendCommitAckEvents(-1L);
448451
return;
449452
}
450453

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,30 @@ public void testSubtaskFails() throws Exception {
144144
.end();
145145
}
146146

147+
@Test
148+
public void testAppendInsertAfterFailoverWithEmptyCheckpoint() throws Exception {
149+
// open the function and ingest data
150+
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 10_000L);
151+
conf.setString(FlinkOptions.OPERATION, "INSERT");
152+
preparePipeline()
153+
.assertEmptyDataFiles()
154+
// make an empty snapshot
155+
.checkpoint(1)
156+
.assertEmptyEvent()
157+
// trigger a partial failure
158+
.subTaskFails(0, 1)
159+
.assertNextEvent()
160+
// make sure coordinator send an ack event to unblock the writers.
161+
.assertNextSubTaskEvent()
162+
// write a set of data and check the result.
163+
.consume(TestData.DATA_SET_INSERT)
164+
.checkpoint(2)
165+
.assertNextEvent()
166+
.checkpointComplete(2)
167+
.checkWrittenData(EXPECTED1)
168+
.end();
169+
}
170+
147171
// Only when Job level fails with INSERT operationType can we roll back the unfinished instant.
148172
// Task level failed retry, we should reuse the unfinished Instant with INSERT operationType
149173
@Test

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hudi.sink.StreamWriteFunction;
2525
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
2626
import org.apache.hudi.sink.bucket.BucketStreamWriteFunction;
27+
import org.apache.hudi.sink.common.AbstractWriteFunction;
2728
import org.apache.hudi.sink.event.WriteMetadataEvent;
2829
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
2930
import org.apache.hudi.util.AvroSchemaConverter;
@@ -182,6 +183,11 @@ public StreamWriteOperatorCoordinator getCoordinator() {
182183
return coordinator;
183184
}
184185

186+
@Override
187+
public AbstractWriteFunction getWriteFunction() {
188+
return this.writeFunction;
189+
}
190+
185191
public MockOperatorCoordinatorContext getCoordinatorContext() {
186192
return coordinatorContext;
187193
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
2424
import org.apache.hudi.sink.append.AppendWriteFunction;
2525
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
26+
import org.apache.hudi.sink.common.AbstractWriteFunction;
2627
import org.apache.hudi.sink.event.WriteMetadataEvent;
2728
import org.apache.hudi.util.AvroSchemaConverter;
2829
import org.apache.hudi.util.StreamerUtil;
@@ -58,6 +59,7 @@ public class InsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
5859

5960
private final MockStreamingRuntimeContext runtimeContext;
6061
private final MockOperatorEventGateway gateway;
62+
private final MockSubtaskGateway subtaskGateway;
6163
private final MockOperatorCoordinatorContext coordinatorContext;
6264
private StreamWriteOperatorCoordinator coordinator;
6365
private final MockStateInitializationContext stateInitializationContext;
@@ -79,6 +81,7 @@ public InsertFunctionWrapper(String tablePath, Configuration conf) throws Except
7981
.build();
8082
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
8183
this.gateway = new MockOperatorEventGateway();
84+
this.subtaskGateway = new MockSubtaskGateway();
8285
this.conf = conf;
8386
this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
8487
// one function
@@ -121,6 +124,11 @@ public OperatorEvent getNextEvent() {
121124
return this.gateway.getNextEvent();
122125
}
123126

127+
@Override
128+
public OperatorEvent getNextSubTaskEvent() {
129+
return this.subtaskGateway.getNextEvent();
130+
}
131+
124132
public void checkpointFunction(long checkpointId) throws Exception {
125133
// checkpoint the coordinator first
126134
this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
@@ -174,6 +182,11 @@ public StreamWriteOperatorCoordinator getCoordinator() {
174182
return coordinator;
175183
}
176184

185+
@Override
186+
public AbstractWriteFunction getWriteFunction() {
187+
return this.writeFunction;
188+
}
189+
177190
@Override
178191
public void close() throws Exception {
179192
this.coordinator.close();
@@ -196,5 +209,7 @@ private void setupWriteFunction() throws Exception {
196209
writeFunction.setOperatorEventGateway(gateway);
197210
writeFunction.initializeState(this.stateInitializationContext);
198211
writeFunction.open(conf);
212+
// set up subtask gateway
213+
coordinator.subtaskReady(0, subtaskGateway);
199214
}
200215
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.sink.utils;
20+
21+
import org.apache.hudi.adapter.ExecutionAttemptUtil;
22+
23+
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
24+
import org.apache.flink.runtime.messages.Acknowledge;
25+
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
26+
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
27+
28+
import java.util.LinkedList;
29+
import java.util.concurrent.CompletableFuture;
30+
31+
/**
32+
* A mock {@link OperatorCoordinator.SubtaskGateway} for unit tests.
33+
*/
34+
public class MockSubtaskGateway implements OperatorCoordinator.SubtaskGateway {
35+
36+
private final LinkedList<OperatorEvent> events = new LinkedList<>();
37+
38+
@Override
39+
public CompletableFuture<Acknowledge> sendEvent(OperatorEvent operatorEvent) {
40+
events.add(operatorEvent);
41+
return CompletableFuture.completedFuture(Acknowledge.get());
42+
}
43+
44+
@Override
45+
public ExecutionAttemptID getExecution() {
46+
return ExecutionAttemptUtil.randomId();
47+
}
48+
49+
@Override
50+
public int getSubtask() {
51+
return 0;
52+
}
53+
54+
public OperatorEvent getNextEvent() {
55+
return this.events.isEmpty() ? null : this.events.removeFirst();
56+
}
57+
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hudi.sink.StreamWriteFunction;
2727
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
2828
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
29+
import org.apache.hudi.sink.common.AbstractWriteFunction;
2930
import org.apache.hudi.sink.event.WriteMetadataEvent;
3031
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
3132
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
@@ -259,6 +260,11 @@ public StreamWriteOperatorCoordinator getCoordinator() {
259260
return coordinator;
260261
}
261262

263+
@Override
264+
public AbstractWriteFunction getWriteFunction() {
265+
return this.writeFunction;
266+
}
267+
262268
public MockOperatorCoordinatorContext getCoordinatorContext() {
263269
return coordinatorContext;
264270
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hudi.common.model.HoodieKey;
2222
import org.apache.hudi.common.model.HoodieRecord;
2323
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
24+
import org.apache.hudi.sink.common.AbstractWriteFunction;
2425
import org.apache.hudi.sink.event.WriteMetadataEvent;
2526

2627
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
@@ -49,10 +50,17 @@ public interface TestFunctionWrapper<I> {
4950
WriteMetadataEvent[] getEventBuffer();
5051

5152
/**
52-
* Returns the next event.
53+
* Returns the next event sent to Coordinator.
5354
*/
5455
OperatorEvent getNextEvent();
5556

57+
/**
58+
* Returns the next event sent to subtask.
59+
*/
60+
default OperatorEvent getNextSubTaskEvent() {
61+
throw new UnsupportedOperationException();
62+
}
63+
5664
/**
5765
* Snapshot all the functions in the wrapper.
5866
*/
@@ -95,6 +103,11 @@ default void restartCoordinator() throws Exception {
95103
*/
96104
StreamWriteOperatorCoordinator getCoordinator();
97105

106+
/**
107+
* Returns the write function.
108+
*/
109+
AbstractWriteFunction getWriteFunction();
110+
98111
/**
99112
* Returns the data buffer of the write task.
100113
*/

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hudi.common.util.Option;
3030
import org.apache.hudi.configuration.OptionsResolver;
3131
import org.apache.hudi.exception.HoodieException;
32+
import org.apache.hudi.sink.event.CommitAckEvent;
3233
import org.apache.hudi.sink.event.WriteMetadataEvent;
3334
import org.apache.hudi.sink.meta.CkpMetadata;
3435
import org.apache.hudi.storage.HoodieStorage;
@@ -172,6 +173,18 @@ public TestHarness emptyEventBuffer() {
172173
return this;
173174
}
174175

176+
/**
177+
* Assert the next event exists and handle over it to the coordinator.
178+
*/
179+
public TestHarness assertNextSubTaskEvent() {
180+
final OperatorEvent nextEvent = this.pipeline.getNextSubTaskEvent();
181+
if (nextEvent != null) {
182+
MatcherAssert.assertThat("The Coordinator expect to send an event", nextEvent, instanceOf(CommitAckEvent.class));
183+
this.pipeline.getWriteFunction().handleOperatorEvent(nextEvent);
184+
}
185+
return this;
186+
}
187+
175188
/**
176189
* Assert the next event exists and handle over it to the coordinator.
177190
*/
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.adapter;
20+
21+
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
22+
23+
/**
24+
* Adapter utils for execution attempt.
25+
*/
26+
public class ExecutionAttemptUtil {
27+
28+
public static ExecutionAttemptID randomId() {
29+
return new ExecutionAttemptID();
30+
}
31+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.adapter;
20+
21+
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
22+
23+
/**
24+
* Adapter utils for execution attempt.
25+
*/
26+
public class ExecutionAttemptUtil {
27+
28+
public static ExecutionAttemptID randomId() {
29+
return new ExecutionAttemptID();
30+
}
31+
}

0 commit comments

Comments
 (0)