|
38 | 38 | import java.util.List;
|
39 | 39 | import java.util.Map;
|
40 | 40 | import java.util.Objects;
|
| 41 | +import java.util.Set; |
41 | 42 | import java.util.concurrent.ConcurrentHashMap;
|
42 | 43 | import java.util.concurrent.atomic.AtomicInteger;
|
43 | 44 | import java.util.concurrent.atomic.AtomicReference;
|
@@ -452,15 +453,30 @@ private List<Part> calculateOverflowingParts(final FileGroup fileGroup) throws I
|
452 | 453 | return Collections.singletonList(new Part(partItems, partBytes, List.copyOf(partEntries)));
|
453 | 454 | }
|
454 | 455 |
|
455 |
| - private void closeAggregate(final FeedKey feedKey, |
456 |
| - final AggregateState aggregateState) { |
457 |
| - LOGGER.debug("Closing aggregate: {}", aggregateState); |
| 456 | + private boolean closeAggregate(final FeedKey feedKey, |
| 457 | + final AggregateState aggregateState) { |
| 458 | + LOGGER.debug(() -> LogUtil.message("closeAggregate() - feedKey: {}, {}, waiting for lock", |
| 459 | + feedKey, aggregateState)); |
458 | 460 | final Lock lock = feedKeyLock.get(feedKey);
|
459 | 461 | lock.lock();
|
460 | 462 | try {
|
461 |
| - destination.accept(aggregateState.aggregateDir); |
462 |
| - aggregateStateMap.remove(feedKey); |
463 |
| - LOGGER.debug("Closed aggregate: {}", aggregateState); |
| 463 | + // Now we hold the feedKey lock, re-check the aggregateStateMap |
| 464 | + if (aggregateStateMap.containsKey(feedKey)) { |
| 465 | + LOGGER.debug(() -> LogUtil.message("closeAggregate() - feedKey: {}, {}, acquired lock", |
| 466 | + feedKey, aggregateState)); |
| 467 | + |
| 468 | + destination.accept(aggregateState.aggregateDir); |
| 469 | + aggregateStateMap.remove(feedKey); |
| 470 | + LOGGER.debug(() -> LogUtil.message("closeAggregate() - feedKey: {}, {}, closed aggregate", |
| 471 | + feedKey, aggregateState)); |
| 472 | + return true; |
| 473 | + } else { |
| 474 | + LOGGER.debug(() -> LogUtil.message( |
| 475 | + "closeAggregate() - feedKey: {}, {}, " + |
| 476 | + "feedKey not in aggregateStateMap, another thread must have closed it.", |
| 477 | + feedKey, aggregateState)); |
| 478 | + return false; |
| 479 | + } |
464 | 480 | } finally {
|
465 | 481 | lock.unlock();
|
466 | 482 | }
|
@@ -570,13 +586,23 @@ private AggregateState createAggregate(final FeedKey feedKey) {
|
570 | 586 | */
|
571 | 587 | private synchronized void closeOldAggregates() {
|
572 | 588 | final AtomicInteger count = new AtomicInteger();
|
573 |
| - aggregateStateMap.forEach((feedKey, aggregateState) -> { |
574 |
| - if (aggregateState.isAggregateTooOld(aggregatorConfig)) { |
575 |
| - // Close the current aggregate. |
576 |
| - count.incrementAndGet(); |
577 |
| - closeAggregate(feedKey, aggregateState); |
| 589 | + final Set<FeedKey> feedKeys = aggregateStateMap.keySet(); |
| 590 | + for (final FeedKey feedKey : feedKeys) { |
| 591 | + // It's possible another thread may have removed it |
| 592 | + final AggregateState aggregateState = aggregateStateMap.get(feedKey); |
| 593 | + if (aggregateState != null |
| 594 | + && aggregateState.isAggregateTooOld(aggregatorConfig)) { |
| 595 | + // Close the current aggregate, under a feedKey lock, so again, |
| 596 | + // another thread may beat us |
| 597 | + final boolean didClose = closeAggregate(feedKey, aggregateState); |
| 598 | + if (didClose) { |
| 599 | + count.incrementAndGet(); |
| 600 | + } else { |
| 601 | + LOGGER.debug("closeAggregate() - feedKey: {}, aggregateState: {}, didn't close", |
| 602 | + feedKey, aggregateState); |
| 603 | + } |
578 | 604 | }
|
579 |
| - }); |
| 605 | + } |
580 | 606 | if (LOGGER.isDebugEnabled()) {
|
581 | 607 | if (count.get() > 0) {
|
582 | 608 | LOGGER.debug("closeOldAggregates() - closed {} old aggregates", count);
|
|
0 commit comments