Skip to content

Commit 1943245

Browse files
gejinzhgejincshuo
authored
[hudi-9041] Send commit ack event when reusing current instant (#12849)
* solve the problem #12820: send act commit event when reuse current instant that recovered from checkpoint * fix compile error on MockSubtaskGateway.java --------- Co-authored-by: gejin <[email protected]> Co-authored-by: cshuo <[email protected]>
1 parent f5de662 commit 1943245

File tree

16 files changed

+361
-1
lines changed

16 files changed

+361
-1
lines changed

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,9 @@ private void handleBootstrapEvent(WriteMetadataEvent event) {
451451
LOG.warn("Reuse current pending Instant {} with {} operationType, "
452452
+ "ignoring empty bootstrap event.", this.instant, WriteOperationType.INSERT.value());
453453
reset();
454+
455+
// send commit act event to unblock write tasks
456+
sendCommitAckEvents(-1L);
454457
return;
455458
}
456459

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,30 @@ public void testSubtaskFails() throws Exception {
126126
.end();
127127
}
128128

129+
@Test
130+
public void testAppendInsertAfterFailoverWithEmptyCheckpoint() throws Exception {
131+
// open the function and ingest data
132+
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 10_000L);
133+
conf.setString(FlinkOptions.OPERATION, "INSERT");
134+
preparePipeline()
135+
.assertEmptyDataFiles()
136+
// make an empty snapshot
137+
.checkpoint(1)
138+
.assertEmptyEvent()
139+
// trigger a partial failure
140+
.subTaskFails(0, 1)
141+
.assertNextEvent()
142+
// make sure coordinator send an ack event to unblock the writers.
143+
.assertNextSubTaskEvent()
144+
// write a set of data and check the result.
145+
.consume(TestData.DATA_SET_INSERT)
146+
.checkpoint(2)
147+
.assertNextEvent()
148+
.checkpointComplete(2)
149+
.checkWrittenData(EXPECTED1)
150+
.end();
151+
}
152+
129153
// Only when Job level fails with INSERT operationType can we roll back the unfinished instant.
130154
// Task level failed retry, we should reuse the unfinished Instant with INSERT operationType
131155
@Test

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hudi.sink.StreamWriteFunction;
2626
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
2727
import org.apache.hudi.sink.bucket.BucketStreamWriteFunction;
28+
import org.apache.hudi.sink.common.AbstractWriteFunction;
2829
import org.apache.hudi.sink.event.WriteMetadataEvent;
2930
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
3031
import org.apache.hudi.util.AvroSchemaConverter;
@@ -195,6 +196,11 @@ public StreamWriteOperatorCoordinator getCoordinator() {
195196
return coordinator;
196197
}
197198

199+
@Override
200+
public AbstractWriteFunction getWriteFunction() {
201+
return this.writeFunction;
202+
}
203+
198204
public MockOperatorCoordinatorContext getCoordinatorContext() {
199205
return coordinatorContext;
200206
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hudi.sink.bulk.RowDataKeyGen;
3030
import org.apache.hudi.sink.bulk.sort.SortOperator;
3131
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
32+
import org.apache.hudi.sink.common.AbstractWriteFunction;
3233
import org.apache.hudi.sink.event.WriteMetadataEvent;
3334
import org.apache.hudi.util.AvroSchemaConverter;
3435
import org.apache.hudi.util.StreamerUtil;
@@ -176,6 +177,11 @@ public StreamWriteOperatorCoordinator getCoordinator() {
176177
return coordinator;
177178
}
178179

180+
@Override
181+
public AbstractWriteFunction getWriteFunction() {
182+
return this.writeFunction;
183+
}
184+
179185
public MockOperatorCoordinatorContext getCoordinatorContext() {
180186
return coordinatorContext;
181187
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
2424
import org.apache.hudi.sink.append.AppendWriteFunction;
2525
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
26+
import org.apache.hudi.sink.common.AbstractWriteFunction;
2627
import org.apache.hudi.sink.event.WriteMetadataEvent;
2728
import org.apache.hudi.util.AvroSchemaConverter;
2829
import org.apache.hudi.util.StreamerUtil;
@@ -58,6 +59,7 @@ public class InsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
5859

5960
private final MockStreamingRuntimeContext runtimeContext;
6061
private final MockOperatorEventGateway gateway;
62+
private final MockSubtaskGateway subtaskGateway;
6163
private final MockOperatorCoordinatorContext coordinatorContext;
6264
private StreamWriteOperatorCoordinator coordinator;
6365
private final MockStateInitializationContext stateInitializationContext;
@@ -79,6 +81,7 @@ public InsertFunctionWrapper(String tablePath, Configuration conf) throws Except
7981
.build();
8082
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
8183
this.gateway = new MockOperatorEventGateway();
84+
this.subtaskGateway = new MockSubtaskGateway();
8285
this.conf = conf;
8386
this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
8487
// one function
@@ -121,6 +124,11 @@ public OperatorEvent getNextEvent() {
121124
return this.gateway.getNextEvent();
122125
}
123126

127+
@Override
128+
public OperatorEvent getNextSubTaskEvent() {
129+
return this.subtaskGateway.getNextEvent();
130+
}
131+
124132
public void checkpointFunction(long checkpointId) throws Exception {
125133
// checkpoint the coordinator first
126134
this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
@@ -174,6 +182,11 @@ public StreamWriteOperatorCoordinator getCoordinator() {
174182
return coordinator;
175183
}
176184

185+
@Override
186+
public AbstractWriteFunction getWriteFunction() {
187+
return this.writeFunction;
188+
}
189+
177190
@Override
178191
public void close() throws Exception {
179192
this.coordinator.close();
@@ -196,5 +209,7 @@ private void setupWriteFunction() throws Exception {
196209
writeFunction.setOperatorEventGateway(gateway);
197210
writeFunction.initializeState(this.stateInitializationContext);
198211
writeFunction.open(conf);
212+
// set up subtask gateway
213+
coordinator.subtaskReady(0, subtaskGateway);
199214
}
200215
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.sink.utils;
20+
21+
import org.apache.hudi.adapter.ExecutionAttemptUtil;
22+
23+
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
24+
import org.apache.flink.runtime.messages.Acknowledge;
25+
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
26+
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
27+
28+
import java.util.LinkedList;
29+
import java.util.concurrent.CompletableFuture;
30+
31+
/**
32+
* A mock {@link OperatorCoordinator.SubtaskGateway} for unit tests.
33+
*/
34+
public class MockSubtaskGateway implements OperatorCoordinator.SubtaskGateway {
35+
36+
private final LinkedList<OperatorEvent> events = new LinkedList<>();
37+
38+
@Override
39+
public CompletableFuture<Acknowledge> sendEvent(OperatorEvent operatorEvent) {
40+
events.add(operatorEvent);
41+
return CompletableFuture.completedFuture(Acknowledge.get());
42+
}
43+
44+
@Override
45+
public ExecutionAttemptID getExecution() {
46+
return ExecutionAttemptUtil.randomId();
47+
}
48+
49+
@Override
50+
public int getSubtask() {
51+
return 0;
52+
}
53+
54+
public OperatorEvent getNextEvent() {
55+
return this.events.isEmpty() ? null : this.events.removeFirst();
56+
}
57+
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hudi.sink.StreamWriteFunction;
2929
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
3030
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
31+
import org.apache.hudi.sink.common.AbstractWriteFunction;
3132
import org.apache.hudi.sink.event.WriteMetadataEvent;
3233
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
3334
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
@@ -274,6 +275,11 @@ public StreamWriteOperatorCoordinator getCoordinator() {
274275
return coordinator;
275276
}
276277

278+
@Override
279+
public AbstractWriteFunction getWriteFunction() {
280+
return this.writeFunction;
281+
}
282+
277283
public MockOperatorCoordinatorContext getCoordinatorContext() {
278284
return coordinatorContext;
279285
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hudi.common.model.HoodieKey;
2222
import org.apache.hudi.common.model.HoodieRecord;
2323
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
24+
import org.apache.hudi.sink.common.AbstractWriteFunction;
2425
import org.apache.hudi.sink.event.WriteMetadataEvent;
2526

2627
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
@@ -49,10 +50,17 @@ public interface TestFunctionWrapper<I> {
4950
WriteMetadataEvent[] getEventBuffer();
5051

5152
/**
52-
* Returns the next event.
53+
* Returns the next event sent to Coordinator.
5354
*/
5455
OperatorEvent getNextEvent();
5556

57+
/**
58+
* Returns the next event sent to subtask.
59+
*/
60+
default OperatorEvent getNextSubTaskEvent() {
61+
throw new UnsupportedOperationException();
62+
}
63+
5664
/**
5765
* Snapshot all the functions in the wrapper.
5866
*/
@@ -103,6 +111,11 @@ default void restartCoordinator() throws Exception {
103111
*/
104112
StreamWriteOperatorCoordinator getCoordinator();
105113

114+
/**
115+
* Returns the write function.
116+
*/
117+
AbstractWriteFunction getWriteFunction();
118+
106119
/**
107120
* Returns the data buffer of the write task.
108121
*/

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hudi.configuration.FlinkOptions;
3434
import org.apache.hudi.configuration.OptionsResolver;
3535
import org.apache.hudi.exception.HoodieException;
36+
import org.apache.hudi.sink.event.CommitAckEvent;
3637
import org.apache.hudi.sink.event.WriteMetadataEvent;
3738
import org.apache.hudi.sink.meta.CkpMetadata;
3839
import org.apache.hudi.sink.meta.CkpMetadataFactory;
@@ -206,6 +207,18 @@ public TestHarness emptyEventBuffer() {
206207
return this;
207208
}
208209

210+
/**
211+
* Assert the next event exists and handle over it to the coordinator.
212+
*/
213+
public TestHarness assertNextSubTaskEvent() {
214+
final OperatorEvent nextEvent = this.pipeline.getNextSubTaskEvent();
215+
if (nextEvent != null) {
216+
MatcherAssert.assertThat("The Coordinator expect to send an event", nextEvent, instanceOf(CommitAckEvent.class));
217+
this.pipeline.getWriteFunction().handleOperatorEvent(nextEvent);
218+
}
219+
return this;
220+
}
221+
209222
/**
210223
* Assert the next event exists and handle over it to the coordinator.
211224
*/
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.adapter;
20+
21+
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
22+
23+
/**
24+
* Adapter utils for execution attempt.
25+
*/
26+
public class ExecutionAttemptUtil {
27+
28+
public static ExecutionAttemptID randomId() {
29+
return new ExecutionAttemptID();
30+
}
31+
}

0 commit comments

Comments
 (0)