lib/good_job/bulk.rb in good_job-3.9.0 vs lib/good_job/bulk.rb in good_job-3.10.0
- old
+ new
@@ -13,40 +13,34 @@
# Capture jobs to a buffer. Pass either a block, or specific Active Jobs to be buffered.
# @param active_jobs [Array<ActiveJob::Base>] Active Jobs to be buffered.
# @param queue_adapter Override the jobs implict queue adapter with an explicit one.
# @return [nil, Array<ActiveJob::Base>] The ActiveJob instances that have been buffered; nil if no active buffer
- def self.capture(active_jobs = nil, queue_adapter: nil)
- raise(ArgumentError, "Use either the block form or the argument form, not both") if block_given? && active_jobs
+ def self.capture(active_jobs = nil, queue_adapter: nil, &block)
+ raise(ArgumentError, "Use either the block form or the argument form, not both") if block && active_jobs
- if block_given?
- begin
- original_buffer = current_buffer
- self.current_buffer = Buffer.new
- yield
- current_buffer.active_jobs
- ensure
- self.current_buffer = original_buffer
- end
- else
+ if block
+ buffer = Buffer.new
+ buffer.capture(&block)
+ buffer.active_jobs
+ elsif current_buffer
current_buffer&.add(active_jobs, queue_adapter: queue_adapter)
end
end
# Capture jobs to a buffer and enqueue them all at once; or enqueue the current buffer.
# @param active_jobs [Array<ActiveJob::Base>] Active Jobs to be enqueued.
# @return [Array<ActiveJob::Base>] The ActiveJob instances that have been captured; check provider_job_id to confirm enqueued.
- def self.enqueue(active_jobs = nil)
- raise(ArgumentError, "Use either the block form or the argument form, not both") if block_given? && active_jobs
+ def self.enqueue(active_jobs = nil, &block)
+ raise(ArgumentError, "Use either the block form or the argument form, not both") if block && active_jobs
- if block_given?
- capture do
- yield
- current_buffer&.enqueue
- end
+ buffer = Buffer.new
+ if block
+ buffer.capture(&block)
+ buffer.enqueue
+ buffer.active_jobs
elsif active_jobs.present?
- buffer = Buffer.new
buffer.add(active_jobs)
buffer.enqueue
buffer.active_jobs
elsif current_buffer.present?
current_buffer.enqueue
@@ -67,11 +61,21 @@
class Buffer
def initialize
@values = []
end
+ def capture
+ original_buffer = Bulk.current_buffer
+ Bulk.current_buffer = self
+ yield
+ ensure
+ Bulk.current_buffer = original_buffer
+ end
+
def add(active_jobs, queue_adapter: nil)
new_pairs = Array(active_jobs).map do |active_job|
+ raise ArgumentError, "Expected an ActiveJob::Base instance, got #{active_job.class}" unless active_job.is_a?(ActiveJob::Base)
+
adapter = queue_adapter || active_job.class.queue_adapter
raise Error, "Jobs must have a Queue Adapter" unless adapter
[active_job, adapter]
end