@@ -27,6 +27,9 @@ import org.jxmpp.jid.Jid
27
27
import java.time.Clock
28
28
import java.time.Duration
29
29
import java.time.Instant
30
+ import java.util.concurrent.atomic.AtomicBoolean
31
+ import java.util.concurrent.atomic.AtomicInteger
32
+ import kotlin.math.max
30
33
import org.jitsi.jicofo.bridge.BridgeConfig.Companion.config as config
31
34
32
35
/* *
@@ -48,36 +51,34 @@ class Bridge @JvmOverloads internal constructor(
48
51
private val clock : Clock = Clock .systemUTC()
49
52
) : Comparable<Bridge> {
50
53
51
- /* *
52
- * Keep track of the recently added endpoints.
53
- */
54
+ /* * Keep track of the recently added endpoints. */
54
55
private val newEndpointsRate = RateTracker (
55
56
config.participantRampupInterval,
56
57
Duration .ofMillis(100 ),
57
58
clock
58
59
)
59
60
60
- /* *
61
- * The last report stress level
62
- */
61
+ private val endpointRestartRequestRate = RateTracker (
62
+ config.iceFailureDetection.interval,
63
+ Duration .ofSeconds(1 ),
64
+ clock
65
+ )
66
+
67
+ /* * Number of endpoints currently allocated on this bridge by this jicofo instance. */
68
+ val endpoints = AtomicInteger (0 )
69
+
70
+ /* * The last report stress level */
63
71
var lastReportedStressLevel = 0.0
64
72
private set
65
73
66
- /* *
67
- * Holds bridge version (if known - not all bridge version are capable of
68
- * reporting it).
69
- */
74
+ /* * Holds bridge version (if known - not all bridge version are capable of reporting it). */
70
75
private var version: String? = null
71
76
72
- /* *
73
- * Whether the last received presence indicated the bridge is healthy.
74
- */
77
+ /* * Whether the last received presence indicated the bridge is healthy. */
75
78
var isHealthy = true
76
79
private set
77
80
78
- /* *
79
- * Holds bridge release ID, or null if not known.
80
- */
81
+ /* * Holds bridge release ID, or null if not known. */
81
82
private var releaseId: String? = null
82
83
83
84
/* *
@@ -108,25 +109,16 @@ class Bridge @JvmOverloads internal constructor(
108
109
}
109
110
}
110
111
111
- /* *
112
- * Start out with the configured value, update if the bridge reports a value.
113
- */
112
+ /* * Start out with the configured value, update if the bridge reports a value. */
114
113
private var averageParticipantStress = config.averageParticipantStress
115
114
116
- /* *
117
- * Stores a boolean that indicates whether the bridge is in graceful shutdown mode.
118
- */
115
+ /* * Stores a boolean that indicates whether the bridge is in graceful shutdown mode. */
119
116
var isInGracefulShutdown = false // we assume it is not shutting down
120
117
121
- /* *
122
- * Whether the bridge is in SHUTTING_DOWN mode.
123
- */
118
+ /* * Whether the bridge is in SHUTTING_DOWN mode. */
124
119
var isShuttingDown = false
125
120
private set
126
121
127
- /* *
128
- * @return true if the bridge is currently in drain mode
129
- */
130
122
/* *
131
123
* Stores a boolean that indicates whether the bridge is in drain mode.
132
124
*/
@@ -140,19 +132,27 @@ class Bridge @JvmOverloads internal constructor(
140
132
*/
141
133
private var failureInstant: Instant ? = null
142
134
143
- /* *
144
- * @return the region of this [Bridge].
145
- */
135
+ /* * @return the region of this [Bridge]. */
146
136
var region: String? = null
147
137
private set
148
138
149
- /* *
150
- * @return the relay ID advertised by the bridge, or `null` if
151
- * none was advertised.
152
- */
139
+ /* * @return the relay ID advertised by the bridge, or `null` if none was advertised. */
153
140
var relayId: String? = null
154
141
private set
155
142
143
+ /* *
144
+ * If this [Bridge] has been removed from the list of bridges. Once removed, the metrics specific to this instance
145
+ * are cleared and no longer emitted. If the bridge re-connects, a new [Bridge] instance will be created.
146
+ */
147
+ val removed = AtomicBoolean (false )
148
+
149
+ /* *
150
+ * The last instant at which we detected, based on restart requests from endpoints, that this bridge is failing ICE
151
+ */
152
+ private var lastIceFailed = Instant .MIN
153
+ private val failingIce: Boolean
154
+ get() = Duration .between(lastIceFailed, clock.instant()) < config.iceFailureDetection.timeout
155
+
156
156
private val logger: Logger = LoggerImpl (Bridge ::class .java.name)
157
157
158
158
init {
@@ -237,37 +237,75 @@ class Bridge @JvmOverloads internal constructor(
237
237
return compare(this , other)
238
238
}
239
239
240
- /* *
241
- * Notifies this [Bridge] that it was used for a new endpoint.
242
- */
240
+ /* * Notifies this [Bridge] that it was used for a new endpoint. */
243
241
fun endpointAdded () {
244
242
newEndpointsRate.update(1 )
243
+ endpoints.incrementAndGet()
244
+ if (! removed.get()) {
245
+ BridgeMetrics .endpoints.set(endpoints.get().toLong(), listOf (jid.resourceOrEmpty.toString()))
246
+ }
245
247
}
246
248
247
- /* *
248
- * Returns the net number of video channels recently allocated or removed
249
- * from this bridge.
250
- */
249
+ fun endpointRemoved () = endpointsRemoved(1 )
250
+ fun endpointsRemoved (count : Int ) {
251
+ endpoints.addAndGet(- count)
252
+ if (! removed.get()) {
253
+ BridgeMetrics .endpoints.set(endpoints.get().toLong(), listOf (jid.resourceOrEmpty.toString()))
254
+ }
255
+ if (endpoints.get() < 0 ) {
256
+ logger.error(" Removed more endpoints than were allocated. Resetting to 0." , Throwable ())
257
+ endpoints.set(0 )
258
+ }
259
+ }
260
+ internal fun markRemoved () {
261
+ if (removed.compareAndSet(false , true )) {
262
+ BridgeMetrics .restartRequestsMetric.remove(listOf (jid.resourceOrEmpty.toString()))
263
+ BridgeMetrics .endpoints.remove(listOf (jid.resourceOrEmpty.toString()))
264
+ }
265
+ }
266
+ internal fun updateMetrics () {
267
+ if (! removed.get()) {
268
+ BridgeMetrics .failingIce.set(failingIce, listOf (jid.resourceOrEmpty.toString()))
269
+ }
270
+ }
271
+
272
+ fun endpointRequestedRestart () {
273
+ endpointRestartRequestRate.update(1 )
274
+ if (! removed.get()) {
275
+ BridgeMetrics .restartRequestsMetric.inc(listOf (jid.resourceOrEmpty.toString()))
276
+ }
277
+
278
+ if (config.iceFailureDetection.enabled) {
279
+ val restartCount = endpointRestartRequestRate.getAccumulatedCount()
280
+ val endpoints = endpoints.get()
281
+ if (endpoints >= config.iceFailureDetection.minEndpoints &&
282
+ restartCount > endpoints * config.iceFailureDetection.threshold
283
+ ) {
284
+ // Reset the timeout regardless of the previous state, but only log if the state changed.
285
+ if (! failingIce) {
286
+ logger.info(" Detected an ICE failing state." )
287
+ }
288
+ lastIceFailed = clock.instant()
289
+ }
290
+ }
291
+ }
292
+
293
+ /* * Returns the net number of video channels recently allocated or removed from this bridge. */
251
294
private val recentlyAddedEndpointCount: Long
252
295
get() = newEndpointsRate.getAccumulatedCount()
253
296
254
- /* *
255
- * The version of this bridge (with embedded release ID, if available).
256
- */
297
+ /* * The version of this bridge (with embedded release ID, if available). */
257
298
val fullVersion: String?
258
299
get() = if (version != null && releaseId != null ) " $version -$releaseId " else version
259
300
260
- /* *
261
- * {@inheritDoc}
262
- */
263
301
override fun toString (): String {
264
302
return String .format(
265
- " Bridge[jid=%s, version=%s, relayId=%s, region=%s, stress =%.2f]" ,
303
+ " Bridge[jid=%s, version=%s, relayId=%s, region=%s, correctedStress =%.2f]" ,
266
304
jid.toString(),
267
305
fullVersion,
268
306
relayId,
269
307
region,
270
- stress
308
+ correctedStress
271
309
)
272
310
}
273
311
@@ -276,34 +314,37 @@ class Bridge @JvmOverloads internal constructor(
276
314
* can exceed 1).
277
315
* @return this bridge's stress level
278
316
*/
279
- val stress: Double
280
- get() =
281
- // While a stress of 1 indicates a bridge is fully loaded, we allow
282
- // larger values to keep sorting correctly.
283
- lastReportedStressLevel +
284
- recentlyAddedEndpointCount.coerceAtLeast(0 ) * averageParticipantStress
317
+ val correctedStress: Double
318
+ get() {
319
+ // Correct for recently added endpoints.
320
+ // While a stress of 1 indicates a bridge is fully loaded, we allow larger values to keep sorting correctly.
321
+ val s = lastReportedStressLevel + recentlyAddedEndpointCount.coerceAtLeast(0 ) * averageParticipantStress
285
322
286
- /* *
287
- * @return true if the stress of the bridge is greater-than-or-equal to the threshold.
288
- */
323
+ // Correct for failing ICE.
324
+ return if (failingIce) max(s, config.stressThreshold + 0.01 ) else s
325
+ }
326
+
327
+ /* * @return true if the stress of the bridge is greater-than-or-equal to the threshold. */
289
328
val isOverloaded: Boolean
290
- get() = stress >= config.stressThreshold
329
+ get() = correctedStress >= config.stressThreshold
291
330
292
331
val debugState: OrderedJsonObject
293
- get() {
294
- val o = OrderedJsonObject ()
295
- o[" version" ] = version.toString()
296
- o[" release" ] = releaseId.toString()
297
- o[" stress" ] = stress
298
- o[" operational" ] = isOperational
299
- o[" region" ] = region.toString()
300
- o[" drain" ] = isDraining
301
- o[" graceful-shutdown" ] = isInGracefulShutdown
302
- o[" shutting-down" ] = isShuttingDown
303
- o[" overloaded" ] = isOverloaded
304
- o[" relay-id" ] = relayId.toString()
305
- o[" healthy" ] = isHealthy
306
- return o
332
+ get() = OrderedJsonObject ().apply {
333
+ this [" corrected-stress" ] = correctedStress
334
+ this [" drain" ] = isDraining
335
+ this [" endpoints" ] = endpoints.get()
336
+ this [" endpoint-restart-requests" ] = endpointRestartRequestRate.getAccumulatedCount()
337
+ this [" failing-ice" ] = failingIce
338
+ this [" graceful-shutdown" ] = isInGracefulShutdown
339
+ this [" healthy" ] = isHealthy
340
+ this [" operational" ] = isOperational
341
+ this [" overloaded" ] = isOverloaded
342
+ this [" region" ] = region.toString()
343
+ this [" relay-id" ] = relayId.toString()
344
+ this [" release" ] = releaseId.toString()
345
+ this [" shutting-down" ] = isShuttingDown
346
+ this [" stress" ] = lastReportedStressLevel
347
+ this [" version" ] = version.toString()
307
348
}
308
349
309
350
companion object {
@@ -327,7 +368,7 @@ class Bridge @JvmOverloads internal constructor(
327
368
return if (myPriority != otherPriority) {
328
369
myPriority - otherPriority
329
370
} else {
330
- b1.stress .compareTo(b2.stress )
371
+ b1.correctedStress .compareTo(b2.correctedStress )
331
372
}
332
373
}
333
374
0 commit comments