begin require 'sidekiq/batch' rescue LoadError end module CanvasSync::JobBatches module Compat module Sidekiq module WorkerExtension def bid Thread.current[CURRENT_BATCH_THREAD_KEY].bid end def batch Thread.current[CURRENT_BATCH_THREAD_KEY] end def batch_context batch&.context || {} end def valid_within_batch? batch.valid? end end class SidekiqCallbackWorker include ::Sidekiq::Worker include WorkerExtension include Batch::Callback::CallbackWorkerCommon def self.enqueue_all(args, queue) return if args.empty? ::Sidekiq::Client.push_bulk( 'class' => self, 'args' => args, 'queue' => queue ) end end class ClientMiddleware include ::Sidekiq::ClientMiddleware if defined? ::Sidekiq::ClientMiddleware def call(_worker, msg, _queue, _redis_pool = nil) if (batch = Thread.current[CURRENT_BATCH_THREAD_KEY]) && should_handle_batch?(msg) batch.increment_job_queue(msg['jid']) if (msg['bid'] = batch.bid) end yield end def should_handle_batch?(msg) return false if CanvasSync::JobBatches::Compat::Sidekiq.is_activejob_job?(msg) true end end class ServerMiddleware include ::Sidekiq::ServerMiddleware if defined? ::Sidekiq::ServerMiddleware def call(_worker, msg, _queue) if (bid = msg['bid']) prev_batch = Thread.current[CURRENT_BATCH_THREAD_KEY] begin Thread.current[CURRENT_BATCH_THREAD_KEY] = Batch.new(bid) yield Thread.current[CURRENT_BATCH_THREAD_KEY].save_context_changes Batch.process_successful_job(bid, msg['jid']) rescue Batch.process_failed_job(bid, msg['jid']) raise ensure Thread.current[CURRENT_BATCH_THREAD_KEY] = prev_batch end else yield end end end def self.is_activejob_job?(msg) return false unless defined?(::ActiveJob) msg['class'] == 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper' && (msg['wrapped'].to_s).constantize < CanvasSync::JobBatches::Compat::ActiveJob::BatchAwareJob end def self.switch_tenant(job) if defined?(::Apartment) ::Apartment::Tenant.switch(job['apartment'] || 'public') do yield end else yield end end def self.sidekiq_middleware(placement, &blk) install_middleware = ->(config) do config.send("#{placement}_middleware") do |chain| blk.call(chain) end end ::Sidekiq.configure_client(&install_middleware) if placement == :client ::Sidekiq.configure_server(&install_middleware) end def self.configure return if @already_configured @already_configured = true if defined?(::Sidekiq::Batch) && ::Sidekiq::Batch != CanvasSync::JobBatches::Batch print "WARNING: Detected Sidekiq Pro or sidekiq-batch. CanvasSync JobBatches may not be fully compatible!" end sidekiq_middleware(:client) do |chain| chain.remove ::Sidekiq::Batch::Middleware::ClientMiddleware if defined?(::Sidekiq::Batch::Middleware::ClientMiddleware) chain.add CanvasSync::JobBatches::Compat::Sidekiq::ClientMiddleware end sidekiq_middleware(:server) do |chain| chain.remove ::Sidekiq::Batch::Middleware::ServerMiddleware if defined?(::Sidekiq::Batch::Middleware::ServerMiddleware) chain.add CanvasSync::JobBatches::Compat::Sidekiq::ServerMiddleware end ::Sidekiq.configure_server do |config| config.death_handlers << ->(job, ex) do switch_tenant(job) do if is_activejob_job?(job) CanvasSync::JobBatches::Compat::ActiveJob.handle_job_death(job["args"][0], ex) elsif job['bid'].present? ::Sidekiq::Batch.process_dead_job(job['bid'], job['jid']) end end end end ::Sidekiq.const_set(:Batch, CanvasSync::JobBatches::Batch) # This alias helps apartment-sidekiq set itself up correctly ::Sidekiq::Batch.const_set(:Server, CanvasSync::JobBatches::Compat::Sidekiq::ServerMiddleware) ::Sidekiq::Worker.send(:include, CanvasSync::JobBatches::Compat::Sidekiq::WorkerExtension) Batch::Callback.worker_class = SidekiqCallbackWorker end end end end require_relative 'sidekiq/web'