Skip to content

Commit cafd30e

Browse files
author
Jan Boroń
committed
[FIX] Implement actual ring buffer to prevent memory leaks when influx is down
1 parent 9cafe77 commit cafd30e

File tree

3 files changed

+18
-18
lines changed

3 files changed

+18
-18
lines changed

build.sbt

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ val defaultScalaVersion = "3.3.1"
55
val scalaVersions = Seq("2.11.12", "2.12.18", "2.13.12", defaultScalaVersion)
66

77
val asyncHttpClientV = "2.12.3"
8+
val commonsCollectionsV = "4.4"
89
val dropwizardMetricsV = "4.0.2"
910
val findbugsV = "3.0.1"
1011
val hikariCPV = "3.2.0"
@@ -96,6 +97,7 @@ lazy val core = project.in(file("core"))
9697
"io.dropwizard.metrics" % "metrics-core" % dropwizardMetricsV,
9798
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCompatV,
9899
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV,
100+
"org.apache.commons" % "commons-collections4" % commonsCollectionsV,
99101

100102
"ch.qos.logback" % "logback-classic" % logbackV % Test,
101103
"org.scalatest" %% "scalatest" % scalaTestV % Test,

core/src/main/scala/influxdbreporter/core/WriterDataBuffer.scala

+9-11
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,29 @@
1616
package influxdbreporter.core
1717

1818
import influxdbreporter.core.writers.WriterData
19+
import org.apache.commons.collections4.queue.CircularFifoQueue
1920

20-
import scala.collection.immutable.List
21-
import scala.collection.mutable.ListBuffer
21+
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SeqHasAsJava}
2222

2323
trait WriterDataBuffer[T] {
2424

25-
def update(add: List[WriterData[T]] = Nil, remove: List[WriterData[T]] = Nil): List[WriterData[T]]
25+
def update(add: List[WriterData[T]] = Nil, remove: List[WriterData[T]] = Nil): Unit
2626

2727
def get(): List[WriterData[T]]
2828
}
2929

3030
class FixedSizeWriterDataBuffer[T](maxSize: Int)
3131
extends WriterDataBuffer[T] {
3232

33-
private var ringBuffer: ListBuffer[WriterData[T]] = ListBuffer.empty
33+
private val buffer: CircularFifoQueue[WriterData[T]] = new CircularFifoQueue[WriterData[T]](maxSize)
3434

35-
override def update(add: List[WriterData[T]] = Nil, remove: List[WriterData[T]] = Nil): List[WriterData[T]] = {
36-
if (add.nonEmpty || remove.nonEmpty) synchronized {
37-
ringBuffer --= remove
38-
add ++=: ringBuffer
39-
ringBuffer = ringBuffer.distinct.take(maxSize)
35+
override def update(add: List[WriterData[T]] = Nil, remove: List[WriterData[T]] = Nil): Unit = {
36+
if(add.nonEmpty || remove.nonEmpty) {
37+
buffer.removeAll(remove.asJava)
38+
buffer.addAll(add.asJava)
4039
}
41-
ringBuffer.toList
4240
}
4341

44-
override def get(): List[WriterData[T]] = ringBuffer.toList
42+
override def get(): List[WriterData[T]] = buffer.asScala.toList
4543

4644
}

core/src/test/scala/influxdbreporter/core/FixedSizeWriterDataBufferTests.scala

+7-7
Original file line numberDiff line numberDiff line change
@@ -30,27 +30,27 @@ class FixedSizeWriterDataBufferTests extends AnyWordSpec {
3030
buffer.get()
3131
}
3232

33-
assertResult(wd(4) :: wd(5) :: wd(1) :: wd(2) :: Nil) {
33+
assertResult(wd(2) :: wd(3) :: wd(4) :: wd(5) :: Nil) {
3434
buffer.update(add = wd(4) :: wd(5) :: Nil)
3535
buffer.get()
3636
}
3737

38-
assertResult(wd(4) :: wd(5) :: wd(2) :: Nil) {
39-
buffer.update(remove = wd(1) :: Nil)
38+
assertResult(wd(3) :: wd(4) :: wd(5) :: Nil) {
39+
buffer.update(remove = wd(2) :: Nil)
4040
buffer.get()
4141
}
4242

43-
assertResult(wd(6) :: wd(7) :: wd(4) :: wd(2) :: Nil) {
43+
assertResult(wd(3) :: wd(4) :: wd(6) :: wd(7) :: Nil) {
4444
buffer.update(add = wd(6) :: wd(7) :: Nil, remove = wd(5) :: Nil)
4545
buffer.get()
4646
}
4747

48-
assertResult(wd(7) :: wd(4) :: Nil) {
49-
buffer.update(add = wd(7) :: Nil, remove = wd(6) :: wd(2) :: Nil)
48+
assertResult(wd(3) :: wd(4) :: wd(7) :: wd(7) :: Nil) {
49+
buffer.update(add = wd(7) :: Nil, remove = wd(6) :: Nil)
5050
buffer.get()
5151
}
5252
}
5353
}
5454

5555
private def wd(num: Int) = new WriterData[String](num.toString)
56-
}
56+
}

0 commit comments

Comments
 (0)