Skip to content

Conversation

yaauie
Copy link
Member

@yaauie yaauie commented Aug 11, 2025

Release notes

Adds support for event compression in the persisted queue, controlled by the per-pipeline queue.compression setting, which defaults to none.

What does this PR do?

Adds non-breaking support for event compression to the persisted queue, as
configured by a new per-pipeline setting queue.compression, which supports:

  • none (default): no compression is performed, but if compressed events are encountered in the queue they will be decompressed
  • speed: compression optimized for speed
  • balanced: compression balancing speed against result size
  • size: compression optimized for maximum reduction of size
  • disabled: compression support entirely disabled; if a pipeline is run in this configuration against a PQ that already contains unacked compressed events, the pipeline WILL crash.

This PR does necessary refactors as no-op stand-alone commits to make reviewing more straight-forward. It is best reviewed in commit order.

Why is it important/What is the impact to the user?

Disk IO is often a performance bottleneck when using the PQ. This feature allows users to spend CPU to reduce the size of events on disk, and therefore also the Disk IO.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

How to test this PR locally

  • Add a ndjson file named example-input.ndjson with event contents
  • Run Logstash with trace-logging enabled, using -S to set queue.type=persisted, queue.drain=true, and queue.compression=size:
    bin/logstash --log.level=trace \
    -Squeue.type=persisted \
    -Squeue.drain=true \
    -Squeue.compression=size \
    --config.string 'input { stdin { codec => json_lines } } output { sink {} }' < example-input.ndjson
    
  • Observe trace logs showing compression and decompression:
    [2025-08-12T13:48:45,723][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 11184->3743
    [2025-08-12T13:48:45,723][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 12057->3965
    [2025-08-12T13:48:45,723][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 11254->3718
    [2025-08-12T13:48:45,723][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 11592->3739
    
    [2025-08-12T13:48:45,723][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3645->10880
    [2025-08-12T13:48:45,724][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3713->11363
    [2025-08-12T13:48:45,724][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3658->10939
    [2025-08-12T13:48:45,724][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3750->10934
    
  • Inspect the page(s) left behind with lsq-pagedump:

Related issues

Use cases

  • Constrained or metered disk IO
  • Limited Disk capacity

yaauie added 4 commits August 11, 2025 14:45
The `ackedqueue.SettingsImpl` uses an _immutable_ builder, which makes
adding options cumbersome; each additional property added needs to modify
code from all existing options.

By introducing an api-internal temporary mutable builder, we can simplify
the process of creating an immutable copy that has a single component
modified.
Copy link
Contributor

🤖 GitHub comments

Expand to view the GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

Copy link
Contributor

mergify bot commented Aug 11, 2025

This pull request does not have a backport label. Could you fix it @yaauie? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit.
  • If no backport is necessary, please add the backport-skip label

Copy link
Contributor

github-actions bot commented Aug 11, 2025

🔍 Preview links for changed docs

@yaauie yaauie added the backport-skip Skip automated backport with mergify label Aug 11, 2025
@yaauie
Copy link
Member Author

yaauie commented Aug 12, 2025

build failure was from a botched rebase that reverted an added helper; I've re-rebased and kicked a new build.

yaauie added 2 commits August 12, 2025 02:50
Adds non-breaking support for event compression to the persisted queue, as
configured by a new per-pipeline setting `queue.compression`, which supports:

 - `none` (default): no compression is performed, but if compressed events
                     are encountered in the queue they will be decompressed
 - `speed`: compression optimized for speed
 - `balanced`: compression balancing speed against result size
 - `size`: compression optimized for maximum reduction of size
 - `disabled`: compression support entirely disabled; if a pipeline is run
               in this configuration against a PQ that already contains
               unacked compressed events, the pipeline WILL crash.

To accomplish this, we then provide an abstract base implementation of the
CompressionCodec whose decode method is capable of _detecting_ and decoding
deflate-encoded payload while letting other payloads through unmodified.
The detection is done with a bitwise operation on the first two bytes of the
payload, so no additional context is needed.

An instance of this deflate-aware compression codec is provided with a
pass-through encode operation when configured with `queue.compression: none`,
which is the default, ensuring that by default logstash is able to decode any
event that had previously been written.

We provide an additional implementation that is capable of _encoding_ events
with a configurable goal: speed, size, or a balance of the two.

All of these deflate-powered compression codecs use a `CleanerThreadLocal`
that holds reusable resources in order to avoid unnecessary allocations, and
cleans up those resources when the owning thread finishes and they become
phantom-reachable.
resolves sonarqube complaint
Since at least Java 11, both Deflater and Inflater instances have been
pre-wired to the sytem cleaner to ensure that their native resources are
cleaned up when they become phantom-reachable. Therefore we don't need
our own `CleanerThreadLocal` implementation and can more simply use the
java-provided `ThreadLocal` to store our wrapper instances.
Copy link

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

Comment on lines +130 to +136
LogStash::AckedQueue.file_settings_builder(path)
.capacity(1024)
.maxUnread(10)
.checkpointMaxAcks(1024)
.checkpointMaxWrites(1024)
.queueMaxBytes(4096)
.build
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LogStash::AckedQueue.file_settings_builder(path)
.capacity(1024)
.maxUnread(10)
.checkpointMaxAcks(1024)
.checkpointMaxWrites(1024)
.queueMaxBytes(4096)
.build
LogStash::AckedQueue.file_settings_builder(path)
.capacity(1024)
.maxUnread(10)
.checkpointMaxAcks(1024)
.checkpointMaxWrites(1024)
.queueMaxBytes(4096)
.build

Comment on lines +120 to +122
if (encodedSize <= BAOS_SHAREABLE_THRESHOLD_BYTES) {
return this.reusableBaosRef.offerAndGet(() -> new ByteArrayOutputStream(BAOS_SHAREABLE_THRESHOLD_BYTES));
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review note: if the thread never encounters a compressed payload that is small enough to safely use a shared (and therefore thread-permanent) BAOS, this is the bit that prevents us from taking on overhead of that shareable BAOS.

* is called against each object when the {@link CleanerThreadLocal} no longer holds a reference to it.
* @param <T> the external value type
*/
public final class CleanerThreadLocal<T> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review note: the CleanerThreadLocal ended up not being necessary, and is removed in a later commit.

@yaauie yaauie requested a review from jsvd August 13, 2025 16:16
Copy link
Member

@jsvd jsvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested this in multiple scenarios and combinations and everything seems to work. the main challenge in testing this is due to our laptops having very very fast disks, so it's necessary to use something like a VM with gp2 or similar volumes.

Two main concerns I've raised on 1:1 but I should write here:

  1. The choice of default compression algorithm. other than convenience, there is no good reason to use zlib instead of zstd (used in kafka, rocksdb, hadoop,etc) or lz4 (cassandra, kafka, etc). Given how sensitive this is feature is to the io/cpu trade off, feels like we should consider a different default compression algorithm, as we're unlikely to introduce a second one once this is out. Users will also often not realize that their bottleneck is the algorithm or its implementation.
  2. It's worth documenting some guidance on how to decide to use this (examples of hot_threads evidencing disk i/o pressure).

yield CompressionCodec.NOOP;
}
case "none" -> {
logger.info("compression support is enabled (read-only)");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at a log entry saying "read-only" can confuse users that the PQ is in a read only mode

Suggested change
logger.info("compression support is enabled (read-only)");
logger.info("compression support is enabled (decompression only)");

yield DeflateAwareCompressionCodec.getInstance();
}
case "speed" -> {
logger.info("compression support is enabled (goal: speed)");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal is a very comp sci term, and provides a sense of "we'll do our best but it's no promises". I don't think it's necessary to pass this subjectivity to the users, so maybe something like:

Suggested change
logger.info("compression support is enabled (goal: speed)");
logger.info("Compression is enabled - level: \"speed\"");

Or logger.info("Compression level set to: speed");

Comment on lines +84 to +88
`queue.compression`
: Sets the event compression goal for use with the persisted queue. Default is `none`. Acceptable values include:
* `speed`: optimize for fastest compression operation
* `size`: optimize for smallest size on disk, spending more CPU
* `balanced`: a balance between the `speed` and `size` settings
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we provide some guidance here on what the users should be able to see in their metrics to decide when to turn this on?

For example, we provide an example of observing mmap operations on multiple consecutive api calls to hot_threads. or/and a combination of low worker utilization and high queue backpressure.

@yaauie
Copy link
Member Author

yaauie commented Sep 2, 2025

  1. I tried with zstd, which achieves better compression with significantly less CPU overhead. I will be abandoning the deflate-specific bits here and opening a replacement PR using zstd.
  2. I need to wire the metrics provider up to the compression codec so that we can make ratios and timings available.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-skip Skip automated backport with mergify enhancement persistent queues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants