Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: record tracker does not support concurrent event batch removal #431

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 47 additions & 22 deletions src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
Expand All @@ -39,6 +40,7 @@ final class KafkaRecordTracker {
private ConcurrentLinkedQueue<EventBatch> failed;
private volatile Map<TopicPartition, OffsetAndMetadata> offsets;
private Collection<TopicPartition> partitions;
private AtomicBoolean removingEventsInProgress = new AtomicBoolean(false);

public KafkaRecordTracker() {
all = new ConcurrentHashMap<>();
Expand All @@ -58,35 +60,58 @@ public void removeAckedEventBatches(final List<EventBatch> batches) {
log.debug("received acked event batches={}", batches);
/* Loop all *assigned* partitions to find the lowest consecutive
* HEC-commited offsets. A batch could contain events coming from a
* variety of topic/partitions, and scanning those events coulb be
* variety of topic/partitions, and scanning those events could be
* expensive.
* Note that if some events are tied to an unassigned partition those
* offsets won't be able to be commited.
* offsets won't be able to be committed.
*/
for (TopicPartition tp : partitions) {
ConcurrentNavigableMap<Long, EventBatch> tpRecords = all.get(tp);
if (tpRecords == null) {
continue; // nothing to remove in this case
}
long offset = -1;
Iterator<Map.Entry<Long, EventBatch>> iter = tpRecords.entrySet().iterator();
for (; iter.hasNext();) {
Map.Entry<Long, EventBatch> e = iter.next();
if (e.getValue().isCommitted()) {
log.debug("processing offset {}", e.getKey());
offset = e.getKey();
iter.remove();
total.decrementAndGet();
} else {
break;
// With the current implementation, we don't need to let multiple threads cleaning events in parallel.
if (removingEventsInProgress.compareAndSet(false, true)) {
try {
long countOfEventsToRemove = 0;

for (TopicPartition tp : partitions) {
ConcurrentNavigableMap<Long, EventBatch> tpRecords = all.get(tp);
if (tpRecords == null) {
continue; // nothing to remove in this case
}
long offset = -1;
Iterator<Map.Entry<Long, EventBatch>> iter = tpRecords.entrySet().iterator();
for (; iter.hasNext(); ) {
Map.Entry<Long, EventBatch> e = iter.next();
if (e.getValue().isCommitted()) {
log.debug("processing offset {}", e.getKey());
offset = e.getKey();
iter.remove();
countOfEventsToRemove++;
} else {
break;
}
}
if (offset >= 0) {
offsets.put(tp, new OffsetAndMetadata(offset + 1));
}
}
}
if (offset >= 0) {
offsets.put(tp, new OffsetAndMetadata(offset + 1));
decrementTotalEventCount(countOfEventsToRemove);
} finally {
removingEventsInProgress.set(false);
}
}
}

private void decrementTotalEventCount(long countOfEventsToRemove) {
total.getAndUpdate(current -> {
if (current < countOfEventsToRemove) {
log.warn("Total event count ({}) is lower than the count ({}) we try to remove, resetting to 0",
current,
countOfEventsToRemove);
return 0;
} else {
return current - countOfEventsToRemove;
}
});
}

public void addFailedEventBatch(final EventBatch batch) {
if (!batch.isFailed()) {
throw new RuntimeException("event batch was not failed");
Expand Down Expand Up @@ -180,7 +205,7 @@ public void cleanupAfterClosedPartitions(Collection<TopicPartition> partitions)
log.warn("purge events={} from closed partitions={}",
countOfEventsToRemove, partitions);
all.keySet().removeAll(partitions);
total.addAndGet(-1L * countOfEventsToRemove);
decrementTotalEventCount(countOfEventsToRemove);
}
}
}
9 changes: 9 additions & 0 deletions src/test/java/com/splunk/hecclient/UnitUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ public static EventBatch createBatch() {
return batch;
}

public static EventBatch createMultiBatch(int count) {
EventBatch batch = new JsonEventBatch();
for (int i = 0; i < count; i++) {
Event event = new JsonEvent("ni-" + i, "hao-" + i);
batch.add(event);
}
return batch;
}

public static EventBatch createRawEventBatch() {
Event event = new RawEvent("ni", "hao");
EventBatch batch = RawEventBatch.factory().build();
Expand Down
48 changes: 48 additions & 0 deletions src/test/java/com/splunk/kafka/connect/KafkaRecordTrackerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class KafkaRecordTrackerTest {
@Test
Expand All @@ -48,6 +52,38 @@ public void addNonFailedEventBatch() {
tracker.addFailedEventBatch(batch);
}

@Test
public void removeEventBatchMultiThread() {
List<EventBatch> batches = new ArrayList<>();
KafkaRecordTracker tracker = new KafkaRecordTracker();
tracker.open(createTopicPartitionList(500));

for (int i = 0; i < 100; i++) {
EventBatch batch = UnitUtil.createMultiBatch(500);
for (int j = 0; j < 500; j++) {
batch.getEvents().get(j).setTied(createSinkRecord(j, i * 1000 + j));
}
batch.commit();
batches.add(batch);
tracker.addEventBatch(batch);
}

Assert.assertEquals(50000, tracker.totalEvents());
ExecutorService executorService = Executors.newFixedThreadPool(2);
try {
Future<?> first = executorService.submit(() -> tracker.removeAckedEventBatches(batches));
Future<?> second = executorService.submit(() -> tracker.removeAckedEventBatches(batches));

first.get();
second.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
executorService.shutdown();
}

Assert.assertEquals(0, tracker.totalEvents());
}
@Test
public void addEventBatch() {
List<EventBatch> batches = new ArrayList<>();
Expand Down Expand Up @@ -104,4 +140,16 @@ private List<TopicPartition> createTopicPartitionList() {
tps.add(new TopicPartition("t", 1));
return tps;
}

private SinkRecord createSinkRecord(int partition, long offset) {
return new SinkRecord("t", partition, null, null, null, "ni, hao", offset);
}

private List<TopicPartition> createTopicPartitionList(int number) {
ArrayList<TopicPartition> tps = new ArrayList<>();
for (int i = 0; i < number; i++) {
tps.add(new TopicPartition("t", i));
}
return tps;
}
}
48 changes: 24 additions & 24 deletions target/site/jacoco/jacoco.csv
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
GROUP,PACKAGE,CLASS,INSTRUCTION_MISSED,INSTRUCTION_COVERED,BRANCH_MISSED,BRANCH_COVERED,LINE_MISSED,LINE_COVERED,COMPLEXITY_MISSED,COMPLEXITY_COVERED,METHOD_MISSED,METHOD_COVERED
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new Credentials() {...},10,0,0,0,3,0,3,0,3,0
splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity,0,40,0,2,0,8,0,7,0,6
splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity,0,40,0,2,0,9,0,7,0,6
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new HostnameVerifier() {...},2,6,0,0,1,1,1,1,1,1
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder.new TrustStrategy() {...},2,6,0,0,1,1,1,1,1,1
splunk-kafka-connect,com.splunk.hecclient,Hec,38,298,4,14,9,73,4,20,0,15
splunk-kafka-connect,com.splunk.hecclient,ConcurrentHec,23,195,1,13,5,48,1,16,0,10
splunk-kafka-connect,com.splunk.hecclient,Hec,38,298,4,14,10,74,4,20,0,15
splunk-kafka-connect,com.splunk.hecclient,ConcurrentHec,23,196,1,13,6,50,1,16,0,10
splunk-kafka-connect,com.splunk.hecclient,HecException,0,9,0,0,0,4,0,2,0,2
splunk-kafka-connect,com.splunk.hecclient,LoadBalancer,47,363,8,24,12,88,7,23,1,13
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller,122,711,11,45,28,157,9,44,0,25
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller.RunAckQuery,21,26,0,0,2,8,0,2,0,2
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder,69,132,0,4,17,46,2,13,2,11
splunk-kafka-connect,com.splunk.hecclient,RawEvent,11,63,0,8,2,18,0,8,0,4
splunk-kafka-connect,com.splunk.hecclient,RawEventBatch,22,168,1,9,8,46,3,17,2,13
splunk-kafka-connect,com.splunk.hecclient,JsonEvent,22,60,0,8,5,21,0,11,0,7
splunk-kafka-connect,com.splunk.hecclient,LoadBalancer,48,362,8,24,12,87,7,23,1,13
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller,122,711,11,45,34,160,9,44,0,25
splunk-kafka-connect,com.splunk.hecclient,HecAckPoller.RunAckQuery,21,26,0,0,3,8,0,2,0,2
splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder,69,132,0,4,16,46,2,13,2,11
splunk-kafka-connect,com.splunk.hecclient,RawEvent,11,63,0,8,3,17,0,8,0,4
splunk-kafka-connect,com.splunk.hecclient,RawEventBatch,22,169,1,9,8,45,3,17,2,13
splunk-kafka-connect,com.splunk.hecclient,JsonEvent,22,60,0,8,6,21,0,11,0,7
splunk-kafka-connect,com.splunk.hecclient,HecEmptyEventException,5,4,0,0,2,2,1,1,1,1
splunk-kafka-connect,com.splunk.hecclient,HecChannel,0,116,0,12,0,36,0,22,0,16
splunk-kafka-connect,com.splunk.hecclient,EventBatch,0,238,0,12,0,58,0,28,0,22
splunk-kafka-connect,com.splunk.hecclient,HecChannel,0,116,0,12,0,35,0,22,0,16
splunk-kafka-connect,com.splunk.hecclient,EventBatch,0,236,0,12,0,60,0,28,0,22
splunk-kafka-connect,com.splunk.hecclient,HecNullEventException,5,4,0,0,2,2,1,1,1,1
splunk-kafka-connect,com.splunk.hecclient,Event,3,204,0,6,1,67,1,29,1,26
splunk-kafka-connect,com.splunk.hecclient,RawEventBatch.Builder,0,45,0,0,0,13,0,7,0,7
splunk-kafka-connect,com.splunk.hecclient,Indexer,178,421,9,19,38,95,10,23,3,16
splunk-kafka-connect,com.splunk.hecclient,Indexer,179,420,9,19,40,96,10,23,3,16
splunk-kafka-connect,com.splunk.hecclient,EventBatch.GzipDataContentProducer,0,21,0,0,0,6,0,2,0,2
splunk-kafka-connect,com.splunk.hecclient,HecAckPollResponse,0,43,0,4,0,8,0,5,0,3
splunk-kafka-connect,com.splunk.hecclient,HecConfig,4,228,1,1,1,81,2,43,1,43
splunk-kafka-connect,com.splunk.hecclient,HecAckPollResponse,0,43,0,4,0,9,0,5,0,3
splunk-kafka-connect,com.splunk.hecclient,HecConfig,5,227,1,1,1,81,2,43,1,43
splunk-kafka-connect,com.splunk.hecclient,Indexer.new Configuration() {...},21,0,0,0,4,0,2,0,2,0
splunk-kafka-connect,com.splunk.hecclient,HecURIBuilder,6,59,0,4,2,14,0,4,0,2
splunk-kafka-connect,com.splunk.hecclient,HecURIBuilder,6,59,0,4,2,13,0,4,0,2
splunk-kafka-connect,com.splunk.hecclient,DoubleSerializer,0,15,0,0,0,4,0,2,0,2
splunk-kafka-connect,com.splunk.hecclient,PostResponse,0,37,0,2,0,13,0,8,0,7
splunk-kafka-connect,com.splunk.hecclient,ResponsePoller,34,79,1,7,7,22,3,11,2,8
splunk-kafka-connect,com.splunk.hecclient,JsonEventBatch,19,33,2,2,7,10,3,6,2,5
splunk-kafka-connect,com.splunk.hecclient,ResponsePoller,33,80,1,7,7,23,3,11,2,8
splunk-kafka-connect,com.splunk.hecclient,JsonEventBatch,19,33,2,2,7,9,3,6,2,5
splunk-kafka-connect,com.splunk.hecclient,EventBatch.HttpEventBatchEntity.new Enumeration() {...},0,44,0,4,0,4,0,5,0,3
splunk-kafka-connect,com.splunk.kafka.connect,HecClientWrapper,3,3,0,0,1,1,1,1,1,1
splunk-kafka-connect,com.splunk.kafka.connect,KafkaRecordTracker,22,315,4,22,6,68,4,26,0,17
splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule.StructSerializer,0,36,0,2,0,6,0,3,0,2
splunk-kafka-connect,com.splunk.kafka.connect,VersionUtils,11,86,0,10,3,25,1,11,1,6
splunk-kafka-connect,com.splunk.kafka.connect,KafkaRecordTracker,27,351,5,25,10,76,5,29,0,19
splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule.StructSerializer,0,36,0,2,0,7,0,3,0,2
splunk-kafka-connect,com.splunk.kafka.connect,VersionUtils,11,86,0,10,4,26,1,11,1,6
splunk-kafka-connect,com.splunk.kafka.connect,JacksonStructModule,0,10,0,0,0,3,0,1,0,1
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkRecord,74,188,14,14,22,47,18,9,5,8
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnectorConfig,5,1337,6,72,1,236,6,47,0,14
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnector,57,338,3,25,12,82,4,26,1,15
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnectorConfig,5,1269,6,72,1,198,6,47,0,14
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnector,57,383,4,30,11,85,5,28,1,15
splunk-kafka-connect,com.splunk.kafka.connect,AbstractClientWrapper,0,3,0,0,0,1,0,1,0,1
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkTask,415,1134,58,84,66,252,48,56,3,30
splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkTask,416,1134,58,84,68,260,48,56,3,30
2 changes: 1 addition & 1 deletion target/site/jacoco/jacoco.xml

Large diffs are not rendered by default.

Loading