@@ -111,6 +111,8 @@ private[consumer] final class Runloop private (
111
111
): Task [Unit ] = {
112
112
val deadline = java.lang.System .nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos
113
113
114
+ def timeToDeadlineMillis (): Long = (deadline - java.lang.System .nanoTime()) / 1000000L
115
+
114
116
val endingTps = streamsToEnd.map(_.tp).toSet
115
117
116
118
def commitsOfEndingStreams (commits : Chunk [Runloop .Commit ]): Chunk [Runloop .Commit ] =
@@ -130,36 +132,92 @@ private[consumer] final class Runloop private (
130
132
ZIO .attempt(consumer.commitAsync(java.util.Collections .emptyMap(), null )).orDie
131
133
}
132
134
133
- def endingStreamsCompletedAndCommitsExist (newCommits : Chunk [Commit ]): Task [Boolean ] =
135
+ sealed trait EndOffsetCommitStatus
136
+ case object EndOffsetNotCommitted extends EndOffsetCommitStatus { override def toString = " not committed" }
137
+ case object EndOffsetCommitPending extends EndOffsetCommitStatus { override def toString = " commit pending" }
138
+ case object EndOffsetCommitted extends EndOffsetCommitStatus { override def toString = " committed" }
139
+
140
+ final case class StreamCompletionStatus (
141
+ tp : TopicPartition ,
142
+ streamEnded : Boolean ,
143
+ lastPulledOffset : Option [Long ],
144
+ endOffsetCommitStatus : EndOffsetCommitStatus
145
+ ) {
146
+ override def toString : String =
147
+ s " ${tp}: " +
148
+ s " ${if (streamEnded) " stream ended" else " stream is running" }, " +
149
+ s " last pulled offset= ${lastPulledOffset.getOrElse(" none" )}, " +
150
+ endOffsetCommitStatus
151
+ }
152
+
153
+ def completionStatusesAsString (completionStatuses : Chunk [StreamCompletionStatus ]): String =
154
+ " Revoked partitions: " + completionStatuses.map(_.toString).mkString(" ; " )
155
+
156
+ def getStreamCompletionStatuses (newCommits : Chunk [Commit ]): UIO [Chunk [StreamCompletionStatus ]] =
134
157
for {
158
+ committedOffsets <- committedOffsetsRef.get
159
+ allPendingCommitOffsets =
160
+ (previousPendingCommits ++ commitsOfEndingStreams(newCommits)).flatMap(_.offsets).map {
161
+ case (tp, offsetAndMetadata) => (tp, offsetAndMetadata.offset())
162
+ }
135
163
streamResults <-
136
164
ZIO .foreach(streamsToEnd) { stream =>
137
165
for {
138
166
isDone <- stream.completedPromise.isDone
139
167
lastPulledOffset <- stream.lastPulledOffset
140
168
endOffset <- if (isDone) stream.completedPromise.await else ZIO .none
141
- } yield (isDone || lastPulledOffset.isEmpty, endOffset)
142
- }
143
- committedOffsets <- committedOffsetsRef.get
144
- } yield {
145
- val allStreamsCompleted = streamResults.forall(_._1)
146
- allStreamsCompleted && {
147
- val endOffsets : Chunk [Offset ] = streamResults.flatMap(_._2)
148
- val allPendingCommits = previousPendingCommits ++ commitsOfEndingStreams(newCommits)
149
- endOffsets.forall { endOffset =>
150
- val tp = endOffset.topicPartition
151
- val offset = endOffset.offset
152
- def endOffsetWasCommitted = committedOffsets.contains(tp, offset)
153
- def endOffsetCommitIsPending = allPendingCommits.exists { pendingCommit =>
154
- pendingCommit.offsets.get(tp).exists { pendingOffset =>
155
- pendingOffset.offset() >= offset
156
- }
157
- }
158
- endOffsetWasCommitted || endOffsetCommitIsPending
169
+
170
+ endOffsetCommitStatus =
171
+ endOffset match {
172
+ case Some (endOffset) if committedOffsets.contains(stream.tp, endOffset.offset) =>
173
+ EndOffsetCommitted
174
+ case Some (endOffset) if allPendingCommitOffsets.contains((stream.tp, endOffset.offset)) =>
175
+ EndOffsetCommitPending
176
+ case _ => EndOffsetNotCommitted
177
+ }
178
+ } yield StreamCompletionStatus (stream.tp, isDone, lastPulledOffset.map(_.offset), endOffsetCommitStatus)
159
179
}
160
- }
180
+ } yield streamResults
181
+
182
+ @ inline
183
+ def logStreamCompletionStatuses (completionStatuses : Chunk [StreamCompletionStatus ]): UIO [Unit ] = {
184
+ val statusStrings = completionStatusesAsString(completionStatuses)
185
+ ZIO .logInfo(
186
+ s " Delaying rebalance until ${streamsToEnd.size} streams (of revoked partitions) have committed " +
187
+ s " the offsets of the records they consumed. Deadline in ${timeToDeadlineMillis()}ms. $statusStrings"
188
+ )
189
+ }
190
+
191
+ def logInitialStreamCompletionStatuses : UIO [Unit ] =
192
+ for {
193
+ completionStatuses <- getStreamCompletionStatuses(newCommits = Chunk .empty)
194
+ _ <- logStreamCompletionStatuses(completionStatuses)
195
+ } yield ()
196
+
197
+ def endingStreamsCompletedAndCommitsExist (newCommits : Chunk [Commit ]): UIO [Boolean ] =
198
+ for {
199
+ completionStatuses <- getStreamCompletionStatuses(newCommits)
200
+ _ <- logStreamCompletionStatuses(completionStatuses)
201
+ } yield completionStatuses.forall { status =>
202
+ // A stream is complete when it never got any records, or when it committed the offset of the last consumed record
203
+ status.lastPulledOffset.isEmpty || (status.streamEnded && status.endOffsetCommitStatus != EndOffsetNotCommitted )
161
204
}
162
205
206
+ def logFinalStreamCompletionStatuses (completed : Boolean , newCommits : Chunk [Commit ]): UIO [Unit ] =
207
+ if (completed)
208
+ ZIO .logInfo(" Continuing rebalance, all offsets of consumed records in the revoked partitions were committed." )
209
+ else
210
+ for {
211
+ completionStatuses <- getStreamCompletionStatuses(newCommits)
212
+ statusStrings = completionStatusesAsString(completionStatuses)
213
+ _ <-
214
+ ZIO .logWarning(
215
+ s " Exceeded deadline waiting for streams (of revoked partitions) to commit the offsets of " +
216
+ s " the records they consumed; the rebalance will continue. " +
217
+ s " This might cause another consumer to process some records again. $statusStrings"
218
+ )
219
+ } yield ()
220
+
163
221
def commitSync : Task [Unit ] =
164
222
ZIO .attempt(consumer.commitSync(java.util.Collections .emptyMap(), commitTimeout))
165
223
@@ -179,17 +237,23 @@ private[consumer] final class Runloop private (
179
237
//
180
238
// Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty.
181
239
// Instead, we poll the queue in a loop.
182
- ZIO .logDebug(s " Waiting for ${streamsToEnd.size} streams to end " ) *>
183
- ZStream
184
- .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll)
185
- .tap(commitAsync)
186
- .forever
187
- .takeWhile(_ => java.lang.System .nanoTime() <= deadline)
188
- .scan(Chunk .empty[Runloop .Commit ])(_ ++ _)
189
- .takeUntilZIO(endingStreamsCompletedAndCommitsExist)
190
- .runDrain *>
191
- commitSync *>
192
- ZIO .logDebug(s " Done waiting for ${streamsToEnd.size} streams to end " )
240
+ for {
241
+ _ <- logInitialStreamCompletionStatuses
242
+ completedAndCommits <-
243
+ ZStream
244
+ .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll)
245
+ .tap(commitAsync)
246
+ .forever
247
+ .takeWhile(_ => java.lang.System .nanoTime() <= deadline)
248
+ .scan(Chunk .empty[Runloop .Commit ])(_ ++ _)
249
+ .mapZIO(commits => endingStreamsCompletedAndCommitsExist(commits).map((_, commits)))
250
+ .takeUntil { case (completed, _) => completed }
251
+ .runLast
252
+ .map(_.getOrElse((false , Chunk .empty)))
253
+ _ <- logFinalStreamCompletionStatuses(completedAndCommits._1, completedAndCommits._2)
254
+ _ <- commitSync
255
+ _ <- ZIO .logDebug(s " Done waiting for ${streamsToEnd.size} streams to end " )
256
+ } yield ()
193
257
}
194
258
195
259
// During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times.
0 commit comments