-
Notifications
You must be signed in to change notification settings - Fork 3.5k
PQ: Add support for event-level compression using deflate #17959
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
The `ackedqueue.SettingsImpl` uses an _immutable_ builder, which makes adding options cumbersome; each additional property added needs to modify code from all existing options. By introducing an api-internal temporary mutable builder, we can simplify the process of creating an immutable copy that has a single component modified.
🤖 GitHub commentsExpand to view the GitHub comments
Just comment with:
|
This pull request does not have a backport label. Could you fix it @yaauie? 🙏
|
🔍 Preview links for changed docs |
build failure was from a botched rebase that reverted an added helper; I've re-rebased and kicked a new build. |
Adds non-breaking support for event compression to the persisted queue, as configured by a new per-pipeline setting `queue.compression`, which supports: - `none` (default): no compression is performed, but if compressed events are encountered in the queue they will be decompressed - `speed`: compression optimized for speed - `balanced`: compression balancing speed against result size - `size`: compression optimized for maximum reduction of size - `disabled`: compression support entirely disabled; if a pipeline is run in this configuration against a PQ that already contains unacked compressed events, the pipeline WILL crash. To accomplish this, we then provide an abstract base implementation of the CompressionCodec whose decode method is capable of _detecting_ and decoding deflate-encoded payload while letting other payloads through unmodified. The detection is done with a bitwise operation on the first two bytes of the payload, so no additional context is needed. An instance of this deflate-aware compression codec is provided with a pass-through encode operation when configured with `queue.compression: none`, which is the default, ensuring that by default logstash is able to decode any event that had previously been written. We provide an additional implementation that is capable of _encoding_ events with a configurable goal: speed, size, or a balance of the two. All of these deflate-powered compression codecs use a `CleanerThreadLocal` that holds reusable resources in order to avoid unnecessary allocations, and cleans up those resources when the owning thread finishes and they become phantom-reachable.
logstash-core/src/test/java/org/logstash/util/CleanerThreadLocalTest.java
Outdated
Show resolved
Hide resolved
resolves sonarqube complaint
Since at least Java 11, both Deflater and Inflater instances have been pre-wired to the sytem cleaner to ensure that their native resources are cleaned up when they become phantom-reachable. Therefore we don't need our own `CleanerThreadLocal` implementation and can more simply use the java-provided `ThreadLocal` to store our wrapper instances.
|
💚 Build Succeeded
History
|
LogStash::AckedQueue.file_settings_builder(path) | ||
.capacity(1024) | ||
.maxUnread(10) | ||
.checkpointMaxAcks(1024) | ||
.checkpointMaxWrites(1024) | ||
.queueMaxBytes(4096) | ||
.build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LogStash::AckedQueue.file_settings_builder(path) | |
.capacity(1024) | |
.maxUnread(10) | |
.checkpointMaxAcks(1024) | |
.checkpointMaxWrites(1024) | |
.queueMaxBytes(4096) | |
.build | |
LogStash::AckedQueue.file_settings_builder(path) | |
.capacity(1024) | |
.maxUnread(10) | |
.checkpointMaxAcks(1024) | |
.checkpointMaxWrites(1024) | |
.queueMaxBytes(4096) | |
.build |
if (encodedSize <= BAOS_SHAREABLE_THRESHOLD_BYTES) { | ||
return this.reusableBaosRef.offerAndGet(() -> new ByteArrayOutputStream(BAOS_SHAREABLE_THRESHOLD_BYTES)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review note: if the thread never encounters a compressed payload that is small enough to safely use a shared (and therefore thread-permanent) BAOS, this is the bit that prevents us from taking on overhead of that shareable BAOS.
* is called against each object when the {@link CleanerThreadLocal} no longer holds a reference to it. | ||
* @param <T> the external value type | ||
*/ | ||
public final class CleanerThreadLocal<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review note: the CleanerThreadLocal
ended up not being necessary, and is removed in a later commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tested this in multiple scenarios and combinations and everything seems to work. the main challenge in testing this is due to our laptops having very very fast disks, so it's necessary to use something like a VM with gp2 or similar volumes.
Two main concerns I've raised on 1:1 but I should write here:
- The choice of default compression algorithm. other than convenience, there is no good reason to use zlib instead of zstd (used in kafka, rocksdb, hadoop,etc) or lz4 (cassandra, kafka, etc). Given how sensitive this is feature is to the io/cpu trade off, feels like we should consider a different default compression algorithm, as we're unlikely to introduce a second one once this is out. Users will also often not realize that their bottleneck is the algorithm or its implementation.
- It's worth documenting some guidance on how to decide to use this (examples of hot_threads evidencing disk i/o pressure).
yield CompressionCodec.NOOP; | ||
} | ||
case "none" -> { | ||
logger.info("compression support is enabled (read-only)"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at a log entry saying "read-only" can confuse users that the PQ is in a read only mode
logger.info("compression support is enabled (read-only)"); | |
logger.info("compression support is enabled (decompression only)"); |
yield DeflateAwareCompressionCodec.getInstance(); | ||
} | ||
case "speed" -> { | ||
logger.info("compression support is enabled (goal: speed)"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Goal is a very comp sci term, and provides a sense of "we'll do our best but it's no promises". I don't think it's necessary to pass this subjectivity to the users, so maybe something like:
logger.info("compression support is enabled (goal: speed)"); | |
logger.info("Compression is enabled - level: \"speed\""); |
Or logger.info("Compression level set to: speed");
`queue.compression` | ||
: Sets the event compression goal for use with the persisted queue. Default is `none`. Acceptable values include: | ||
* `speed`: optimize for fastest compression operation | ||
* `size`: optimize for smallest size on disk, spending more CPU | ||
* `balanced`: a balance between the `speed` and `size` settings |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we provide some guidance here on what the users should be able to see in their metrics to decide when to turn this on?
For example, we provide an example of observing mmap operations on multiple consecutive api calls to hot_threads. or/and a combination of low worker utilization and high queue backpressure.
|
Release notes
Adds support for event compression in the persisted queue, controlled by the per-pipeline
queue.compression
setting, which defaults tonone
.What does this PR do?
Adds non-breaking support for event compression to the persisted queue, as
configured by a new per-pipeline setting
queue.compression
, which supports:none
(default): no compression is performed, but if compressed events are encountered in the queue they will be decompressedspeed
: compression optimized for speedbalanced
: compression balancing speed against result sizesize
: compression optimized for maximum reduction of sizedisabled
: compression support entirely disabled; if a pipeline is run in this configuration against a PQ that already contains unacked compressed events, the pipeline WILL crash.This PR does necessary refactors as no-op stand-alone commits to make reviewing more straight-forward. It is best reviewed in commit order.
Why is it important/What is the impact to the user?
Disk IO is often a performance bottleneck when using the PQ. This feature allows users to spend CPU to reduce the size of events on disk, and therefore also the Disk IO.
Checklist
How to test this PR locally
example-input.ndjson
with event contents-S
to setqueue.type=persisted
,queue.drain=true
, andqueue.compression=size
:lsq-pagedump
:Related issues
Use cases