Skip to content

Commit

Permalink
Add GenericEnqueuer for consistent job priorities
Browse files Browse the repository at this point in the history
Some delayed jobs enqueue other delayed jobs. For example, an app delete job
may enqueue secondary jobs like blobstore deletion and buildpack cache cleanup.
If CAPI is configured with dedicated job priorities or dynamic job priorities
is enabled, these secondary jobs might unintentionally receive a higher priority
than their primary job. This could lead to less critical jobs, like blobstore deletion,
being processed before more critical ones, such as service instance creation.

This change introduces `GenericEnqueuer`, a singleton enqueuer ensuring that all
jobs enqueued within the same job execution context inherit the same priority.
It is automatically initialized and destroyed within the CCJob wrapper, ensuring
consistent priority propagation. Jobs can now be enqueued using:
`GenericEnqueuer.shared.enqueue(job)`

- **Priority Incrementation:**
  - `enqueue` and `enqueue_pollable` now support an optional `priority_increment` parameter.
  - This increases the priority value (making the job less important), allowing
    secondary jobs (e.g., blobstore deletion) to have a lower priority than their parent job.
- **Preserving Priority for Re-enqueued Jobs:**
  - The `preserve_priority` flag ensures that re-enqueued jobs retain their previous priority.
  - Prevents unnecessary priority increases on repeated executions.
- **Config Priority Handling:**
  - If a job has a dedicated priority configured, it is added to the current priority
    and `priority_increment`, ensuring correct propagation.

| Job Type               | Preserve Priority | Config Priority | Increment | Final Priority                       |
|------------------------|-------------------|-----------------|-----------|--------------------------------------|
| **Parent Job**         | ❌ No             | `100`           | `nil`     | **100**                              |
| **Sub-Job A**          | ❌ No             | `200`           | `50`      | **350** (100+200+50)                 |
| **Sub-Job B**          | ❌ No             | `nil`           | `50`      | **150** (100+50)                     |
| **Sub-Job C**          | ❌ No             | `nil`           | `nil`     | **100** (inherits parent)            |
| **Tertiary Job A1**    | ❌ No             | `50`            | `20`      | **420** (350+50+20)                  |
| **Tertiary Job B1**    | ✅ Yes            | `10` (ignored)  | `30`      | **150** (preserved from Sub-Job B)   |
| **Tertiary Job C1**    | ❌ No             | `nil`           | `50`      | **150** (100+50)                     |
| **Re-enqueued Job**    | ✅ Yes            | `20` (ignored)  | `100`     | **42** (original preserved priority) |

Co-authored-by: Johannes Haass <[email protected]>
  • Loading branch information
johha committed Mar 3, 2025
1 parent fad2da1 commit c703fc7
Show file tree
Hide file tree
Showing 31 changed files with 308 additions and 65 deletions.
2 changes: 1 addition & 1 deletion app/actions/app_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def route_mappings_to_delete(app)

def delete_buildpack_cache(app)
delete_job = Jobs::V3::BuildpackCacheDelete.new(app.guid)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(delete_job)
Jobs::GenericEnqueuer.shared.enqueue(delete_job, priority_increment: Jobs::REDUCED_PRIORITY)
end

def logger
Expand Down
2 changes: 1 addition & 1 deletion app/actions/buildpack_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def delete(buildpacks)
end
if buildpack.key
blobstore_delete = Jobs::Runtime::BlobstoreDelete.new(buildpack.key, :buildpack_blobstore)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(blobstore_delete)
Jobs::GenericEnqueuer.shared.enqueue(blobstore_delete, priority_increment: Jobs::REDUCED_PRIORITY)
end
end

Expand Down
2 changes: 1 addition & 1 deletion app/actions/droplet_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def copy_buildpack_droplet(new_droplet)
new_droplet.buildpack_lifecycle_data(reload: true)

copy_job = Jobs::V3::DropletBitsCopier.new(@source_droplet.guid, new_droplet.guid)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(copy_job)
Jobs::GenericEnqueuer.shared.enqueue(copy_job)
end
end
end
2 changes: 1 addition & 1 deletion app/actions/droplet_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def delete(droplets)

if droplet.blobstore_key
blobstore_delete = Jobs::Runtime::BlobstoreDelete.new(droplet.blobstore_key, :droplet_blobstore)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(blobstore_delete)
Jobs::GenericEnqueuer.shared.enqueue(blobstore_delete, priority_increment: Jobs::REDUCED_PRIORITY)
end

Repositories::DropletEventRepository.record_delete(
Expand Down
3 changes: 2 additions & 1 deletion app/actions/mixins/bindings_delete.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'jobs/queues'
require 'jobs/enqueuer'
require 'jobs/generic_enqueuer'
require 'jobs/v3/delete_binding_job'
require 'jobs/v3/delete_service_binding_job_factory'

Expand All @@ -19,7 +20,7 @@ def delete_bindings(bindings, user_audit_info:)
result = binding_delete_action.delete(binding)
unless result[:finished]
polling_job = DeleteBindingJob.new(type, binding.guid, user_audit_info:)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue_pollable(polling_job)
Jobs::GenericEnqueuer.shared.enqueue_pollable(polling_job)
unbinding_operation_in_progress!(binding)
end
rescue StandardError => e
Expand Down
2 changes: 1 addition & 1 deletion app/actions/package_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def copy(destination_app_guid:, source_package:, user_audit_info:, record_event:
package.db.transaction do
package.save

@enqueued_job = Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(Jobs::V3::PackageBitsCopier.new(source_package.guid, package.guid)) if source_package.type == 'bits'
@enqueued_job = Jobs::GenericEnqueuer.shared.enqueue(Jobs::V3::PackageBitsCopier.new(source_package.guid, package.guid)) if source_package.type == 'bits'

record_audit_event(package, source_package, user_audit_info) if record_event
end
Expand Down
4 changes: 3 additions & 1 deletion app/actions/package_delete.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'jobs/generic_enqueuer'

module VCAP::CloudController
class PackageDelete
def initialize(user_audit_info)
Expand All @@ -10,7 +12,7 @@ def delete(packages)
packages.each do |package|
unless package.docker?
package_src_delete_job = create_package_source_deletion_job(package)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(package_src_delete_job) if package_src_delete_job
Jobs::GenericEnqueuer.shared.enqueue(package_src_delete_job, priority_increment: Jobs::REDUCED_PRIORITY) if package_src_delete_job
end

package.destroy
Expand Down
2 changes: 1 addition & 1 deletion app/actions/routing/route_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def delete_sync(route:, recursive:)
end

def delete_async(route:, recursive:)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(do_delete(recursive, route))
Jobs::GenericEnqueuer.shared.enqueue(do_delete(recursive, route))
end

def delete_unmapped_route(route:)
Expand Down
2 changes: 1 addition & 1 deletion app/actions/service_broker_create.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def create(message)
service_event_repository.record_broker_event_with_request(:create, broker, message.audit_hash)

synchronization_job = SynchronizeBrokerCatalogJob.new(broker.guid, user_audit_info: service_event_repository.user_audit_info)
pollable_job = Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue_pollable(synchronization_job)
pollable_job = Jobs::GenericEnqueuer.shared.enqueue_pollable(synchronization_job)
end

{ pollable_job: }
Expand Down
2 changes: 1 addition & 1 deletion app/actions/services/locks/deleter_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def unlock_and_destroy!

def enqueue_and_unlock!(attributes_to_update, job)
service_instance.save_and_update_operation(attributes_to_update)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(job)
Jobs::GenericEnqueuer.shared.enqueue(job)
@needs_unlock = false
end

Expand Down
2 changes: 1 addition & 1 deletion app/actions/services/locks/updater_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def asynchronous_unlock!
end

def enqueue_unlock!(job)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(job)
Jobs::GenericEnqueuer.shared.enqueue(job)
@needs_unlock = false
end

Expand Down
2 changes: 1 addition & 1 deletion app/actions/services/service_instance_create.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def setup_async_job(request_attrs, service_instance)
@services_event_repository.user_audit_info,
request_attrs
)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(job)
Jobs::GenericEnqueuer.shared.enqueue(job)
@services_event_repository.record_service_instance_event(:start_create, service_instance, request_attrs)
end

Expand Down
2 changes: 1 addition & 1 deletion app/actions/space_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def delete_service_instances(space_model)
result = service_instance_deleter.delete
unless result[:finished]
polling_job = V3::DeleteServiceInstanceJob.new(service_instance.guid, @services_event_repository.user_audit_info)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue_pollable(polling_job)
Jobs::GenericEnqueuer.shared.enqueue_pollable(polling_job)
errors << CloudController::Errors::ApiError.new_from_details('AsyncServiceInstanceOperationInProgress', service_instance.name)
end
rescue StandardError => e
Expand Down
2 changes: 1 addition & 1 deletion app/actions/v2/services/service_binding_create.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def create(app, service_instance, message, volume_mount_services_enabled, accept

binding.save_with_new_operation({ type: 'create', state: 'in progress', broker_provided_operation: binding_result[:operation] })
job = Jobs::Services::ServiceBindingStateFetch.new(binding.guid, @user_audit_info, message.audit_hash)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(job)
Jobs::GenericEnqueuer.shared.enqueue(job)
Repositories::ServiceBindingEventRepository.record_start_create(binding, @user_audit_info, message.audit_hash, manifest_triggered: @manifest_triggered)
else
binding.save
Expand Down
4 changes: 2 additions & 2 deletions app/actions/v2/services/service_binding_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def foreground_delete_request(service_binding)
end

def background_delete_request(service_binding)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(Jobs::DeleteActionJob.new(ServiceBinding, service_binding.guid, self))
Jobs::GenericEnqueuer.shared.enqueue(Jobs::DeleteActionJob.new(ServiceBinding, service_binding.guid, self))
end

def delete(service_bindings)
Expand All @@ -39,7 +39,7 @@ def delete(service_bindings)
service_binding.save_with_new_operation({ type: 'delete', state: 'in progress', broker_provided_operation: broker_response[:operation] })

job = VCAP::CloudController::Jobs::Services::ServiceBindingStateFetch.new(service_binding.guid, @user_audit_info, {})
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(job)
Jobs::GenericEnqueuer.shared.enqueue(job)
Repositories::ServiceBindingEventRepository.record_start_delete(service_binding, @user_audit_info)
else
service_binding.destroy
Expand Down
2 changes: 1 addition & 1 deletion app/actions/v3/service_broker_update.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def enqueue_update
previous_broker_state,
user_audit_info: service_event_repository.user_audit_info
)
pollable_job = Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue_pollable(synchronization_job)
pollable_job = Jobs::GenericEnqueuer.shared.enqueue_pollable(synchronization_job)
end

pollable_job
Expand Down
2 changes: 1 addition & 1 deletion app/actions/v3/service_instance_update_managed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def enqueue_update
user_audit_info: user_audit_info,
audit_hash: message.audit_hash
)
pollable_job = Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue_pollable(update_job)
pollable_job = Jobs::GenericEnqueuer.shared.enqueue_pollable(update_job)
lock.asynchronous_unlock!
ensure
lock.unlock_and_fail! if lock.present? && lock.needs_unlock?
Expand Down
8 changes: 8 additions & 0 deletions app/jobs/cc_job.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
module VCAP::CloudController
module Jobs
class CCJob
def before(delayed_job)
GenericEnqueuer.shared(priority: delayed_job.priority)
end

def after(_delayed_job)
GenericEnqueuer.reset!
end

def reschedule_at(time, attempts)
time + (attempts**4) + 5
end
Expand Down
3 changes: 3 additions & 0 deletions app/jobs/delete_action_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ def delete_action_can_return_warnings?
delete_action.respond_to?(:can_return_warnings?) && delete_action.can_return_warnings?
end

# def delete_action_can_enqueue_jobs?
# delete_action.respond_to?(:can_enqueue_jobs?) && delete_action.can_enqueue_jobs?
# end
attr_reader :model_class, :delete_action
end
end
Expand Down
30 changes: 21 additions & 9 deletions app/jobs/enqueuer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ def initialize(opts={})
load_delayed_job_plugins
end

def enqueue(job)
enqueue_job(job)
def enqueue(job, run_at: nil, priority_increment: nil)
enqueue_job(job, run_at:, priority_increment:)
end

def enqueue_pollable(job, existing_guid: nil)
def enqueue_pollable(job, existing_guid: nil, run_at: nil, priority_increment: nil, preserve_priority: false)
wrapped_job = PollableJobWrapper.new(job, existing_guid:)

wrapped_job = yield wrapped_job if block_given?

delayed_job = enqueue_job(wrapped_job)
delayed_job = enqueue_job(wrapped_job, run_at:, priority_increment:, preserve_priority:)
PollableJobModel.find_by_delayed_job(delayed_job)
end

Expand All @@ -37,14 +37,26 @@ def run_inline(job)

private

def enqueue_job(job)
def enqueue_job(job, run_at: nil, priority_increment: nil, preserve_priority: false)
@opts['guid'] = SecureRandom.uuid
request_id = ::VCAP::Request.current_id
timeout_job = TimeoutJob.new(job, job_timeout(job))
logging_context_job = LoggingContextJob.new(timeout_job, request_id)
job_priority = job_priority(job)
@opts[:priority] = job_priority unless @opts[:priority] || job_priority.nil?
Delayed::Job.enqueue(logging_context_job, @opts)

base_priority = @opts[:priority] || 0
priority_from_config = get_overwritten_job_priority_from_config(job) || 0

final_priority = base_priority
final_priority += priority_from_config unless preserve_priority
final_priority += [priority_increment, 0].max if priority_increment && !preserve_priority

local_opts = {}
# DelayedJob might have a different default priority. In the context of the enqueuer, we consider 0 as the default priority.
# Thus, we only set the priority if we use a non-default priority.
local_opts[:priority] = final_priority if final_priority > 0
local_opts[:run_at] = run_at if run_at

Delayed::Job.enqueue(logging_context_job, @opts.merge(local_opts))
end

def load_delayed_job_plugins
Expand All @@ -58,7 +70,7 @@ def job_timeout(job)
@timeout_calculator.calculate(unwrapped_job.try(:job_name_in_configuration))
end

def job_priority(job)
def get_overwritten_job_priority_from_config(job)
unwrapped_job = unwrap_job(job)
@priority_overwriter.get(unwrapped_job.try(:display_name)) ||
@priority_overwriter.get(unwrapped_job.try(:job_name_in_configuration)) ||
Expand Down
20 changes: 20 additions & 0 deletions app/jobs/generic_enqueuer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module VCAP::CloudController
module Jobs
REDUCED_PRIORITY = 50

class GenericEnqueuer < Enqueuer
def self.shared(priority: nil)
stored_instance = Thread.current[:generic_enqueuer]
return stored_instance if stored_instance && priority.nil?

new_instance = new(queue: Jobs::Queues.generic, priority: priority)
Thread.current[:generic_enqueuer] ||= new_instance
new_instance
end

def self.reset!
Thread.current[:generic_enqueuer] = nil
end
end
end
end
13 changes: 4 additions & 9 deletions app/jobs/reoccurring_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def success(current_delayed_job)
elsif next_enqueue_would_exceed_maximum_duration?
expire!
else
enqueue_next_job(pollable_job, current_delayed_job.priority)
enqueue_next_job(pollable_job)
end
end

Expand Down Expand Up @@ -75,15 +75,10 @@ def expire!
raise CloudController::Errors::ApiError.new_from_details('JobTimeout')
end

def enqueue_next_job(pollable_job, priority)
opts = {
queue: Jobs::Queues.generic,
run_at: Delayed::Job.db_time_now + next_execution_in,
priority: priority
}

def enqueue_next_job(pollable_job)
run_at = Delayed::Job.db_time_now + next_execution_in
@retry_number += 1
Jobs::Enqueuer.new(opts).enqueue_pollable(self, existing_guid: pollable_job.guid)
Jobs::GenericEnqueuer.shared.enqueue_pollable(self, existing_guid: pollable_job.guid, run_at: run_at, preserve_priority: true)
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions app/jobs/runtime/expired_blob_cleanup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ def max_attempts
end

def enqueue_droplet_delete_job(droplet_guid)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(Jobs::Runtime::DeleteExpiredDropletBlob.new(droplet_guid))
Jobs::GenericEnqueuer.shared.enqueue(Jobs::Runtime::DeleteExpiredDropletBlob.new(droplet_guid))
end

def enqueue_package_delete_job(package_guid)
Jobs::Enqueuer.new(queue: Jobs::Queues.generic).enqueue(Jobs::Runtime::DeleteExpiredPackageBlob.new(package_guid))
Jobs::GenericEnqueuer.shared.enqueue(Jobs::Runtime::DeleteExpiredPackageBlob.new(package_guid))
end

def logger
Expand Down
4 changes: 2 additions & 2 deletions app/jobs/v2/services/asynchronous_operations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def retry_job(retry_after_header: '')
end

def enqueue_again
opts = { queue: Jobs::Queues.generic, run_at: Delayed::Job.db_time_now + next_execution_in }
run_at_time = Delayed::Job.db_time_now + next_execution_in
self.retry_number += 1
Jobs::Enqueuer.new(opts).enqueue(self)
Jobs::GenericEnqueuer.shared.enqueue(self, run_at: run_at_time)
end

def default_polling_exponential_backoff
Expand Down
9 changes: 3 additions & 6 deletions lib/services/service_brokers/v2/orphan_mitigator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ def cleanup_failed_provision(service_instance)
service_instance.service_plan.guid
)

opts = { queue: VCAP::CloudController::Jobs::Queues.generic, run_at: Delayed::Job.db_time_now }
VCAP::CloudController::Jobs::Enqueuer.new(opts).enqueue(orphan_deprovision_job)
VCAP::CloudController::Jobs::GenericEnqueuer.shared.enqueue(orphan_deprovision_job, run_at: Delayed::Job.db_time_now)
end

def cleanup_failed_bind(service_binding)
Expand All @@ -24,8 +23,7 @@ def cleanup_failed_bind(service_binding)
binding_info
)

opts = { queue: VCAP::CloudController::Jobs::Queues.generic, run_at: Delayed::Job.db_time_now }
VCAP::CloudController::Jobs::Enqueuer.new(opts).enqueue(unbind_job)
VCAP::CloudController::Jobs::GenericEnqueuer.shared.enqueue(unbind_job, run_at: Delayed::Job.db_time_now)
end

def cleanup_failed_key(service_key)
Expand All @@ -35,8 +33,7 @@ def cleanup_failed_key(service_key)
service_key.service_instance.guid
)

opts = { queue: VCAP::CloudController::Jobs::Queues.generic, run_at: Delayed::Job.db_time_now }
VCAP::CloudController::Jobs::Enqueuer.new(opts).enqueue(key_delete_job)
VCAP::CloudController::Jobs::GenericEnqueuer.shared.enqueue(key_delete_job, run_at: Delayed::Job.db_time_now)
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions spec/unit/actions/buildpack_delete_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ module VCAP::CloudController
it 'first deletes the database record and afterwards the blob' do
expect(buildpack).to receive(:destroy).ordered
expect(Jobs::Runtime::BlobstoreDelete).to receive(:new).ordered
enqueue_job_dbl = double('Jobs::Enqueuer')
expect(Jobs::Enqueuer).to receive(:new).and_return(enqueue_job_dbl).ordered
expect(enqueue_job_dbl).to receive(:enqueue).ordered
generic_enqueuer_dbl = double('Jobs::GenericEnqueuer')
expect(Jobs::GenericEnqueuer).to receive(:shared).and_return(generic_enqueuer_dbl).ordered
expect(generic_enqueuer_dbl).to receive(:enqueue).ordered

buildpack_delete.delete([buildpack])
end
Expand Down
Loading

0 comments on commit c703fc7

Please sign in to comment.