diff --git a/src/piped/core.clj b/src/piped/core.clj index 78b8508..b1c4b68 100644 --- a/src/piped/core.clj +++ b/src/piped/core.clj @@ -65,7 +65,8 @@ acker-parallelism nacker-parallelism blocking-consumers - queue-visibility-timeout-seconds] + queue-visibility-timeout-seconds + acker-batch-size] :as opts}] (specs/assert-options opts) @@ -84,7 +85,7 @@ acker-chan (async/chan) nacker-chan (async/chan) pipe (async/chan) - acker-batched (utils/deadline-batching acker-chan 10) + acker-batched (utils/deadline-batching acker-chan (or acker-batch-size 10)) nacker-batched (utils/combo-batching nacker-chan 5000 10) composed-consumer (if transform-fn (comp consumer-fn transform-fn) consumer-fn)] diff --git a/src/piped/specs.clj b/src/piped/specs.clj index 58e778a..1d2e1aa 100644 --- a/src/piped/specs.clj +++ b/src/piped/specs.clj @@ -13,6 +13,7 @@ (s/def :piped/extend #{:extend}) (s/def :piped/delay-seconds nat-int?) (s/def :piped/queue-visibility-timeout-seconds pos-int?) +(s/def :piped/acker-batch-size pos-int?) (s/def :piped/action-map (s/keys :req-un [:piped/action] @@ -33,7 +34,8 @@ :piped/nacker-parallelism :piped/blocking-consumers :piped/transform-fn - :piped/queue-visibility-timeout-seconds])) + :piped/queue-visibility-timeout-seconds + :piped/acker-batch-size])) (defn assert-options [config] (if-not (s/valid? :piped/options-map config)