Skip to content

Commit 3ece5f8

Browse files
committed
Respect concurrency limits when releasing claimed executions during shutdown
When a worker gracefully shuts down, claimed executions are released back to ready state via ClaimedExecution#release. Previously, this always used dispatch_bypassing_concurrency_limits, which ignored whether another job with the same concurrency key was already running. This could cause duplicate concurrent executions in the following scenario: 1. Job A starts running with concurrency key "X" (semaphore value = 0) 2. Time passes, semaphore expires and is deleted 3. Job B with same key "X" enqueues, creates new semaphore, starts running 4. Worker running Job A receives shutdown signal 5. Job A is released via dispatch_bypassing_concurrency_limits 6. Job A goes to ready state, gets picked up by another worker 7. Both Job A and Job B now running concurrently (violates limit!) Fix: Before releasing a job, check if any other jobs with the same concurrency key are currently executing. If so, go through normal dispatch which respects the concurrency policy (block or discard). If not, continue to bypass limits for performance. This ensures that graceful shutdown doesn't violate concurrency guarantees, even when semaphores have expired during long-running job execution.
1 parent 52b3d7b commit 3ece5f8

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

app/models/solid_queue/claimed_execution.rb

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,13 @@ def perform
7878
def release
7979
SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id) do
8080
transaction do
81-
job.dispatch_bypassing_concurrency_limits
81+
if other_executions_holding_concurrency_lock?
82+
# Another job with same concurrency key is already running.
83+
# Go through normal dispatch which respects concurrency limits.
84+
job.dispatch
85+
else
86+
job.dispatch_bypassing_concurrency_limits
87+
end
8288
destroy!
8389
end
8490
end
@@ -113,4 +119,13 @@ def finished
113119
destroy!
114120
end
115121
end
122+
123+
def other_executions_holding_concurrency_lock?
124+
return false unless job.concurrency_limited?
125+
126+
SolidQueue::Job.joins(:claimed_execution)
127+
.where(concurrency_key: job.concurrency_key)
128+
.where.not(id: job.id)
129+
.exists?
130+
end
116131
end

test/models/solid_queue/claimed_execution_test.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,24 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
6262
assert job.reload.ready?
6363
end
6464

65+
test "release bypasses concurrency limits when no other job with same key is executing" do
66+
job_result = JobResult.create!(queue_name: "default", status: "")
67+
68+
# Create Job A with concurrency limit and claim it
69+
job_a = DiscardableUpdateResultJob.perform_later(job_result, name: "A")
70+
solid_queue_job_a = SolidQueue::Job.find_by(active_job_id: job_a.job_id)
71+
SolidQueue::ReadyExecution.claim(solid_queue_job_a.queue_name, 1, @process.id)
72+
claimed_execution_a = SolidQueue::ClaimedExecution.find_by(job_id: solid_queue_job_a.id)
73+
assert claimed_execution_a
74+
75+
# Release job A - no other job with same key is running, so it should go to ready
76+
assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::ReadyExecution.count } => 1 do
77+
claimed_execution_a.release
78+
end
79+
80+
assert solid_queue_job_a.reload.ready?
81+
end
82+
6583
test "fail with error" do
6684
claimed_execution = prepare_and_claim_job AddToBufferJob.perform_later(42)
6785
job = claimed_execution.job

0 commit comments

Comments
 (0)