Skip to content

Commit

Permalink
Fix: record tracker does not support concurrent event batch removal
Browse files Browse the repository at this point in the history
  • Loading branch information
ludovic-boutros committed Apr 26, 2024
1 parent 6879f4b commit 077178b
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 47 deletions.
70 changes: 48 additions & 22 deletions src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.splunk.hecclient.Event;
import com.splunk.hecclient.EventBatch;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
Expand All @@ -27,6 +28,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
Expand All @@ -39,6 +41,7 @@ final class KafkaRecordTracker {
private ConcurrentLinkedQueue<EventBatch> failed;
private volatile Map<TopicPartition, OffsetAndMetadata> offsets;
private Collection<TopicPartition> partitions;
private AtomicBoolean removingEventsInProgress = new AtomicBoolean(false);

public KafkaRecordTracker() {
all = new ConcurrentHashMap<>();
Expand All @@ -58,35 +61,58 @@ public void removeAckedEventBatches(final List<EventBatch> batches) {
log.debug("received acked event batches={}", batches);
/* Loop all *assigned* partitions to find the lowest consecutive
* HEC-commited offsets. A batch could contain events coming from a
* variety of topic/partitions, and scanning those events coulb be
* variety of topic/partitions, and scanning those events could be
* expensive.
* Note that if some events are tied to an unassigned partition those
* offsets won't be able to be commited.
* offsets won't be able to be committed.
*/
for (TopicPartition tp : partitions) {
ConcurrentNavigableMap<Long, EventBatch> tpRecords = all.get(tp);
if (tpRecords == null) {
continue; // nothing to remove in this case
}
long offset = -1;
Iterator<Map.Entry<Long, EventBatch>> iter = tpRecords.entrySet().iterator();
for (; iter.hasNext();) {
Map.Entry<Long, EventBatch> e = iter.next();
if (e.getValue().isCommitted()) {
log.debug("processing offset {}", e.getKey());
offset = e.getKey();
iter.remove();
total.decrementAndGet();
} else {
break;
// With the current implementation, we don't need to let multiple threads cleaning events in parallel.
if (removingEventsInProgress.compareAndSet(false, true)) {
try {
long countOfEventsToRemove = 0;

for (TopicPartition tp : partitions) {
ConcurrentNavigableMap<Long, EventBatch> tpRecords = all.get(tp);
if (tpRecords == null) {
continue; // nothing to remove in this case
}
long offset = -1;
Iterator<Map.Entry<Long, EventBatch>> iter = tpRecords.entrySet().iterator();
for (; iter.hasNext(); ) {
Map.Entry<Long, EventBatch> e = iter.next();
if (e.getValue().isCommitted()) {
log.debug("processing offset {}", e.getKey());
offset = e.getKey();
iter.remove();
countOfEventsToRemove++;
} else {
break;
}
}
if (offset >= 0) {
offsets.put(tp, new OffsetAndMetadata(offset + 1));
}
}
}
if (offset >= 0) {
offsets.put(tp, new OffsetAndMetadata(offset + 1));
decrementTotalEventCount(countOfEventsToRemove);
} finally {
removingEventsInProgress.set(false);
}
}
}

private void decrementTotalEventCount(long countOfEventsToRemove) {
total.getAndUpdate(current -> {
if (current < countOfEventsToRemove) {
log.warn("Total event count ({}) is lower than the count ({}) we try to remove, resetting to 0",
current,
countOfEventsToRemove);
return 0;
} else {
return current - countOfEventsToRemove;
}
});
}

public void addFailedEventBatch(final EventBatch batch) {
if (!batch.isFailed()) {
throw new RuntimeException("event batch was not failed");
Expand Down Expand Up @@ -180,7 +206,7 @@ public void cleanupAfterClosedPartitions(Collection<TopicPartition> partitions)
log.warn("purge events={} from closed partitions={}",
countOfEventsToRemove, partitions);
all.keySet().removeAll(partitions);
total.addAndGet(-1L * countOfEventsToRemove);
decrementTotalEventCount(countOfEventsToRemove);
}
}
}
9 changes: 9 additions & 0 deletions src/test/java/com/splunk/hecclient/UnitUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ public static EventBatch createBatch() {
return batch;
}

public static EventBatch createMultiBatch(int count) {
EventBatch batch = new JsonEventBatch();
for (int i = 0; i < count; i++) {
Event event = new JsonEvent("ni-" + i, "hao-" + i);
batch.add(event);
}
return batch;
}

public static EventBatch createRawEventBatch() {
Event event = new RawEvent("ni", "hao");
EventBatch batch = RawEventBatch.factory().build();
Expand Down
48 changes: 48 additions & 0 deletions src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class KafkaRecordTrackerTest {
@Test
Expand All @@ -48,6 +52,38 @@ public void addNonFailedEventBatch() {
tracker.addFailedEventBatch(batch);
}

@Test
public void removeEventBatchMultiThread() {
List<EventBatch> batches = new ArrayList<>();
KafkaRecordTracker tracker = new KafkaRecordTracker();
tracker.open(createTopicPartitionList(500));

for (int i = 0; i < 100; i++) {
EventBatch batch = UnitUtil.createMultiBatch(500);
for (int j = 0; j < 500; j++) {
batch.getEvents().get(j).setTied(createSinkRecord(j, i * 1000 + j));
}
batch.commit();
batches.add(batch);
tracker.addEventBatch(batch);
}

Assert.assertEquals(50000, tracker.totalEvents());
ExecutorService executorService = Executors.newFixedThreadPool(2);
try {
Future<?> first = executorService.submit(() -> tracker.removeAckedEventBatches(batches));
Future<?> second = executorService.submit(() -> tracker.removeAckedEventBatches(batches));

first.get();
second.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
executorService.shutdown();
}

Assert.assertEquals(0, tracker.totalEvents());
}
@Test
public void addEventBatch() {
List<EventBatch> batches = new ArrayList<>();
Expand Down Expand Up @@ -104,4 +140,16 @@ private List<TopicPartition> createTopicPartitionList() {
tps.add(new TopicPartition("t", 1));
return tps;
}

private SinkRecord createSinkRecord(int partition, long offset) {
return new SinkRecord("t", partition, null, null, null, "ni, hao", offset);
}

private List<TopicPartition> createTopicPartitionList(int number) {
ArrayList<TopicPartition> tps = new ArrayList<>();
for (int i = 0; i < number; i++) {
tps.add(new TopicPartition("t", i));
}
return tps;
}
}
48 changes: 24 additions & 24 deletions target/site/jacoco/jacoco.csv
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
GROUP,PACKAGE,CLASS,INSTRUCTION_MISSED,INSTRUCTION_COVERED,BRANCH_MISSED,BRANCH_COVERED,LINE_MISSED,LINE_COVERED,COMPLEXITY_MISSED,COMPLEXITY_COVERED,METHOD_MISSED,METHOD_COVERED
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new Credentials() {...},10,0,0,0,3,0,3,0,3,0
splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity,0,40,0,2,0,8,0,7,0,6
splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity,0,40,0,2,0,9,0,7,0,6
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new HostnameVerifier() {...},2,6,0,0,1,1,1,1,1,1
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new TrustStrategy() {...},2,6,0,0,1,1,1,1,1,1
splunk-kafka-connect,com.splunk.hecclient,Hec,38,298,4,14,9,73,4,20,0,15
splunk-kafka-connect,com.splunk.hecclient,ConcurrentHec,23,195,1,13,5,48,1,16,0,10
splunk-kafka-connect,com.splunk.hecclient,Hec,38,298,4,14,10,74,4,20,0,15
splunk-kafka-connect,com.splunk.hecclient,ConcurrentHec,23,196,1,13,6,50,1,16,0,10
splunk-kafka-connect,com.splunk.hecclient,HecException,0,9,0,0,0,4,0,2,0,2
splunk-kafka-connect,com.splunk.hecclient,LoadBalancer,47,363,8,24,12,88,7,23,1,13
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller,122,711,11,45,28,157,9,44,0,25
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller.RunAckQuery,21,26,0,0,2,8,0,2,0,2
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder,69,132,0,4,17,46,2,13,2,11
splunk-kafka-connect,com.splunk.hecclient,RawEvent,11,63,0,8,2,18,0,8,0,4
splunk-kafka-connect,com.splunk.hecclient,RawEventBatch,22,168,1,9,8,46,3,17,2,13
splunk-kafka-connect,com.splunk.hecclient,JsonEvent,22,60,0,8,5,21,0,11,0,7
splunk-kafka-connect,com.splunk.hecclient,LoadBalancer,48,362,8,24,12,87,7,23,1,13
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller,122,711,11,45,34,160,9,44,0,25
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller.RunAckQuery,21,26,0,0,3,8,0,2,0,2
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder,69,132,0,4,16,46,2,13,2,11
splunk-kafka-connect,com.splunk.hecclient,RawEvent,11,63,0,8,3,17,0,8,0,4
splunk-kafka-connect,com.splunk.hecclient,RawEventBatch,22,169,1,9,8,45,3,17,2,13
splunk-kafka-connect,com.splunk.hecclient,JsonEvent,22,60,0,8,6,21,0,11,0,7
splunk-kafka-connect,com.splunk.hecclient,HecEmptyEventException,5,4,0,0,2,2,1,1,1,1
splunk-kafka-connect,com.splunk.hecclient,HecChannel,0,116,0,12,0,36,0,22,0,16
splunk-kafka-connect,com.splunk.hecclient,EventBatch,0,238,0,12,0,58,0,28,0,22
splunk-kafka-connect,com.splunk.hecclient,HecChannel,0,116,0,12,0,35,0,22,0,16
splunk-kafka-connect,com.splunk.hecclient,EventBatch,0,236,0,12,0,60,0,28,0,22
splunk-kafka-connect,com.splunk.hecclient,HecNullEventException,5,4,0,0,2,2,1,1,1,1
splunk-kafka-connect,com.splunk.hecclient,Event,3,204,0,6,1,67,1,29,1,26
splunk-kafka-connect,com.splunk.hecclient,RawEventBatch.Builder,0,45,0,0,0,13,0,7,0,7
splunk-kafka-connect,com.splunk.hecclient,Indexer,178,421,9,19,38,95,10,23,3,16
splunk-kafka-connect,com.splunk.hecclient,Indexer,179,420,9,19,40,96,10,23,3,16
splunk-kafka-connect,com.splunk.hecclient,EventBatch.GzipDataContentProducer,0,21,0,0,0,6,0,2,0,2
splunk-kafka-connect,com.splunk.hecclient,HecAckPollResponse,0,43,0,4,0,8,0,5,0,3
splunk-kafka-connect,com.splunk.hecclient,HecConfig,4,228,1,1,1,81,2,43,1,43
splunk-kafka-connect,com.splunk.hecclient,HecAckPollResponse,0,43,0,4,0,9,0,5,0,3
splunk-kafka-connect,com.splunk.hecclient,HecConfig,5,227,1,1,1,81,2,43,1,43
splunk-kafka-connect,com.splunk.hecclient,Indexer.new Configuration() {...},21,0,0,0,4,0,2,0,2,0
splunk-kafka-connect,com.splunk.hecclient,HecURIBuilder,6,59,0,4,2,14,0,4,0,2
splunk-kafka-connect,com.splunk.hecclient,HecURIBuilder,6,59,0,4,2,13,0,4,0,2
splunk-kafka-connect,com.splunk.hecclient,DoubleSerializer,0,15,0,0,0,4,0,2,0,2
splunk-kafka-connect,com.splunk.hecclient,PostResponse,0,37,0,2,0,13,0,8,0,7
splunk-kafka-connect,com.splunk.hecclient,ResponsePoller,34,79,1,7,7,22,3,11,2,8
splunk-kafka-connect,com.splunk.hecclient,JsonEventBatch,19,33,2,2,7,10,3,6,2,5
splunk-kafka-connect,com.splunk.hecclient,ResponsePoller,33,80,1,7,7,23,3,11,2,8
splunk-kafka-connect,com.splunk.hecclient,JsonEventBatch,19,33,2,2,7,9,3,6,2,5
splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity.new Enumeration() {...},0,44,0,4,0,4,0,5,0,3
splunk-kafka-connect,com.splunk.kafka.connect,HecClientWrapper,3,3,0,0,1,1,1,1,1,1
splunk-kafka-connect,com.splunk.kafka.connect,KafkaRecordTracker,22,315,4,22,6,68,4,26,0,17
splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule.StructSerializer,0,36,0,2,0,6,0,3,0,2
splunk-kafka-connect,com.splunk.kafka.connect,VersionUtils,11,86,0,10,3,25,1,11,1,6
splunk-kafka-connect,com.splunk.kafka.connect,KafkaRecordTracker,27,351,5,25,10,76,5,29,0,19
splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule.StructSerializer,0,36,0,2,0,7,0,3,0,2
splunk-kafka-connect,com.splunk.kafka.connect,VersionUtils,11,86,0,10,4,26,1,11,1,6
splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule,0,10,0,0,0,3,0,1,0,1
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkRecord,74,188,14,14,22,47,18,9,5,8
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnectorConfig,5,1337,6,72,1,236,6,47,0,14
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnector,57,338,3,25,12,82,4,26,1,15
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnectorConfig,5,1269,6,72,1,198,6,47,0,14
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnector,57,383,4,30,11,85,5,28,1,15
splunk-kafka-connect,com.splunk.kafka.connect,AbstractClientWrapper,0,3,0,0,0,1,0,1,0,1
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkTask,415,1134,58,84,66,252,48,56,3,30
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkTask,416,1134,58,84,68,260,48,56,3,30
2 changes: 1 addition & 1 deletion target/site/jacoco/jacoco.xml

Large diffs are not rendered by default.

0 comments on commit 077178b

Please sign in to comment.