lib/good_job/bulk.rb in good_job-3.9.0 vs lib/good_job/bulk.rb in good_job-3.10.0

- old
+ new

@@ -13,40 +13,34 @@ # Capture jobs to a buffer. Pass either a block, or specific Active Jobs to be buffered. # @param active_jobs [Array<ActiveJob::Base>] Active Jobs to be buffered. # @param queue_adapter Override the jobs implict queue adapter with an explicit one. # @return [nil, Array<ActiveJob::Base>] The ActiveJob instances that have been buffered; nil if no active buffer - def self.capture(active_jobs = nil, queue_adapter: nil) - raise(ArgumentError, "Use either the block form or the argument form, not both") if block_given? && active_jobs + def self.capture(active_jobs = nil, queue_adapter: nil, &block) + raise(ArgumentError, "Use either the block form or the argument form, not both") if block && active_jobs - if block_given? - begin - original_buffer = current_buffer - self.current_buffer = Buffer.new - yield - current_buffer.active_jobs - ensure - self.current_buffer = original_buffer - end - else + if block + buffer = Buffer.new + buffer.capture(&block) + buffer.active_jobs + elsif current_buffer current_buffer&.add(active_jobs, queue_adapter: queue_adapter) end end # Capture jobs to a buffer and enqueue them all at once; or enqueue the current buffer. # @param active_jobs [Array<ActiveJob::Base>] Active Jobs to be enqueued. # @return [Array<ActiveJob::Base>] The ActiveJob instances that have been captured; check provider_job_id to confirm enqueued. - def self.enqueue(active_jobs = nil) - raise(ArgumentError, "Use either the block form or the argument form, not both") if block_given? && active_jobs + def self.enqueue(active_jobs = nil, &block) + raise(ArgumentError, "Use either the block form or the argument form, not both") if block && active_jobs - if block_given? - capture do - yield - current_buffer&.enqueue - end + buffer = Buffer.new + if block + buffer.capture(&block) + buffer.enqueue + buffer.active_jobs elsif active_jobs.present? - buffer = Buffer.new buffer.add(active_jobs) buffer.enqueue buffer.active_jobs elsif current_buffer.present? current_buffer.enqueue @@ -67,11 +61,21 @@ class Buffer def initialize @values = [] end + def capture + original_buffer = Bulk.current_buffer + Bulk.current_buffer = self + yield + ensure + Bulk.current_buffer = original_buffer + end + def add(active_jobs, queue_adapter: nil) new_pairs = Array(active_jobs).map do |active_job| + raise ArgumentError, "Expected an ActiveJob::Base instance, got #{active_job.class}" unless active_job.is_a?(ActiveJob::Base) + adapter = queue_adapter || active_job.class.queue_adapter raise Error, "Jobs must have a Queue Adapter" unless adapter [active_job, adapter] end