module CanvasSync::JobBatches module Compat module ActiveJob module BatchAwareJob extend ActiveSupport::Concern included do around_perform do |job, block| if (@bid) # This _must_ be @bid - not just bid prev_batch = Thread.current[CURRENT_BATCH_THREAD_KEY] begin Thread.current[CURRENT_BATCH_THREAD_KEY] = Batch.new(@bid) block.call Thread.current[CURRENT_BATCH_THREAD_KEY].save_context_changes Batch.process_successful_job(@bid, job_id) rescue Batch.process_failed_job(@bid, job_id) raise ensure Thread.current[CURRENT_BATCH_THREAD_KEY] = prev_batch end else block.call end end around_enqueue do |job, block| if (batch = Thread.current[CURRENT_BATCH_THREAD_KEY]) @bid = batch.bid batch.append_jobs(job_id) if @bid end block.call end end def bid @bid || Thread.current[CURRENT_BATCH_THREAD_KEY]&.bid end def batch Thread.current[CURRENT_BATCH_THREAD_KEY] end def batch_context batch&.context || {} end def valid_within_batch? batch.valid? end def serialize super.tap do |data| data['batch_id'] = @bid # This _must_ be @bid - not just bid data end end def deserialize(data) super @bid = data['batch_id'] end end class ActiveJobCallbackWorker < ::ActiveJob::Base include Batch::Callback::CallbackWorkerCommon def self.enqueue_all(args, queue) args.each do |arg_set| set(queue: queue).perform_later(*arg_set) end end end def self.handle_job_death(job, error = nil) if job.is_a?(Array) event = ActiveSupport::Notifications::Event.new(*job) payload = event.payload job = payload[:job].serialize error = payload[:error] end if job["job_id"].present? && job["batch_id"].present? CanvasSync::JobBatches::Batch.process_dead_job(job['batch_id'], job['job_id']) end end def self.configure ::ActiveJob::Base.include BatchAwareJob begin ActiveSupport::Notifications.subscribe "discard.active_job" do |*args| handle_job_death(args) end ActiveSupport::Notifications.subscribe "retry_stopped.active_job" do |*args| handle_job_death(args) end rescue => err Rails.logger.warn(err) end Batch::Callback.worker_class ||= ActiveJobCallbackWorker end end end end