module CanvasSync::JobUniqueness module Compat module Sidekiq module WorkerExtension extend ActiveSupport::Concern include UniqueJobCommon class_methods do def ensure_uniqueness(**kwargs) super(**kwargs) if !(defined?(@@validated_config) && @@validated_config) Compat::Sidekiq.validate_middleware_placement!() @@validated_config = true end end end end class SidekiqLockContext < LockContext def job_scheduled_at @job_instance&.[]("at") end def reenqueue(schedule_in:) job_class.set(queue: job_queue.to_sym).perform_in(schedule_in, *@job_instance["args"]) end end class CommonMiddleware def lock_context(msg) opts = worker_uniqueness(msg) return nil unless opts SidekiqLockContext.new({ job_clazz: msg['class'], jid: msg['jid'], queue: msg['queue'], args: msg['args'], # kwargs: msg['kwargs'], **(msg['uniqueness_cache_data'] || {}), }, job_instance: msg) end def worker_uniqueness(msg) return nil if Compat::Sidekiq.is_activejob_job?(msg) worker_class = msg['class'].constantize return nil unless worker_class.respond_to?(:unique_job_options) worker_class.unique_job_options end end class ClientMiddleware < CommonMiddleware include ::Sidekiq::ClientMiddleware if defined? ::Sidekiq::ClientMiddleware def call(_worker, msg, _queue, _redis_pool = nil, &blk) ctx = lock_context(msg) return blk.call unless ctx msg['uniqueness_cache_data'] = ctx.cache_data ctx.handle_lifecycle!(:enqueue, &blk) end end class ServerMiddleware < CommonMiddleware include ::Sidekiq::ServerMiddleware if defined? ::Sidekiq::ServerMiddleware def call(_worker, msg, _queue, &blk) ctx = lock_context(msg) return blk.call unless ctx ctx.handle_lifecycle!(:perform, &blk) end end def self.is_activejob_job?(msg) return false unless defined?(::ActiveJob) msg['class'] == 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper' && (msg['wrapped'].to_s).constantize < Compat::ActiveJob::UniqueJobExtension end def self.validate_middleware_order(chain, order) chain_classes = chain.entries.map(&:klass) filtered = chain_classes.select { |klass| order.include?(klass) } raise "Middleware chain does not contain all required middleware: #{order - filtered}" unless order.all? { |klass| filtered.include?(klass) } raise "Middleware must be in order: #{order.inspect}" if filtered != order end def self.sidekiq_middleware(placement, &blk) install_middleware = ->(config) do config.send("#{placement}_middleware") do |chain| blk.call(chain) end end ::Sidekiq.configure_client(&install_middleware) if placement == :client ::Sidekiq.configure_server(&install_middleware) end def self.validate_middleware_placement! sidekiq_middleware(:client) do |chain| # Unique middleware must come _before_ the Batch middleware so that the uniqueness middleware can wrap the job in a batch validate_middleware_order(chain, [ CanvasSync::JobUniqueness::Compat::Sidekiq::ClientMiddleware, CanvasSync::JobBatches::Compat::Sidekiq::ClientMiddleware, ]) end sidekiq_middleware(:server) do |chain| # Unique middleware must com _after_ the Batch middleware so that the Batch is loaded before reaching the uniqueness middleware validate_middleware_order(chain, [ CanvasSync::JobBatches::Compat::Sidekiq::ServerMiddleware, CanvasSync::JobUniqueness::Compat::Sidekiq::ServerMiddleware, ]) end end def self.configure sidekiq_middleware(:client) do |chain| chain.insert_before CanvasSync::JobBatches::Compat::Sidekiq::ClientMiddleware, Compat::Sidekiq::ClientMiddleware end sidekiq_middleware(:server) do |chain| chain.insert_after CanvasSync::JobBatches::Compat::Sidekiq::ServerMiddleware, Compat::Sidekiq::ServerMiddleware end ::Sidekiq::Worker.extend(ActiveSupport::Concern) unless ::Sidekiq::Worker < ActiveSupport::Concern ::Sidekiq::Worker.send(:include, Compat::Sidekiq::WorkerExtension) end end end end # require_relative 'sidekiq/web'