begin require 'sidekiq/batch' rescue LoadError end module CanvasSync module JobBatches module Sidekiq module WorkerExtension def bid Thread.current[:batch].bid end def batch Thread.current[:batch] end def batch_context batch&.context || {} end def valid_within_batch? batch.valid? end end class SidekiqCallbackWorker include ::Sidekiq::Worker include WorkerExtension include Batch::Callback::CallbackWorkerCommon def self.enqueue_all(args, queue) return if args.empty? ::Sidekiq::Client.push_bulk( 'class' => self, 'args' => args, 'queue' => queue ) end end class ClientMiddleware def call(_worker, msg, _queue, _redis_pool = nil) if (batch = Thread.current[:batch]) && should_handle_batch?(msg) batch.increment_job_queue(msg['jid']) if (msg[:bid] = batch.bid) end yield end def should_handle_batch?(msg) return false if msg['class'] == 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper' && msg['wrapped'].constantize < BatchAwareJob true end end class ServerMiddleware def call(_worker, msg, _queue) if (bid = msg['bid']) begin Thread.current[:batch] = Batch.new(bid) yield Thread.current[:batch] = nil Batch.process_successful_job(bid, msg['jid']) rescue Batch.process_failed_job(bid, msg['jid']) raise ensure Thread.current[:batch] = nil end else yield end end end def self.configure if defined?(::Sidekiq::Batch) && ::Sidekiq::Batch != JobBatches::Batch print "WARNING: Detected Sidekiq Pro or sidekiq-batch. CanvasSync JobBatches may not be fully compatible!" end ::Sidekiq.configure_client do |config| config.client_middleware do |chain| chain.remove ::Sidekiq::Batch::Middleware::ClientMiddleware if defined?(::Sidekiq::Batch::Middleware::ClientMiddleware) chain.add JobBatches::Sidekiq::ClientMiddleware end end ::Sidekiq.configure_server do |config| config.client_middleware do |chain| chain.remove ::Sidekiq::Batch::Middleware::ClientMiddleware if defined?(::Sidekiq::Batch::Middleware::ClientMiddleware) chain.add JobBatches::Sidekiq::ClientMiddleware end config.server_middleware do |chain| chain.remove ::Sidekiq::Batch::Middleware::ServerMiddleware if defined?(::Sidekiq::Batch::Middleware::ServerMiddleware) chain.add JobBatches::Sidekiq::ServerMiddleware end config.death_handlers << ->(job, ex) do return unless job['bid'].present? if defined?(::Apartment) ::Apartment::Tenant.switch(job['apartment'] || 'public') do Sidekiq::Batch.process_dead_job(job['bid'], job['jid']) end else Sidekiq::Batch.process_dead_job(job['bid'], job['jid']) end end end ::Sidekiq.const_set(:Batch, CanvasSync::JobBatches::Batch) # This alias helps apartment-sidekiq set itself up correctly ::Sidekiq::Batch.const_set(:Server, CanvasSync::JobBatches::Sidekiq::ServerMiddleware) ::Sidekiq::Worker.send(:include, JobBatches::Sidekiq::WorkerExtension) Batch::Callback.worker_class = SidekiqCallbackWorker end end end end