require 'sidekiq/circuit_breaker/manager' require 'sidekiq/circuit_breaker/middleware' require 'sidekiq/client' module Sidekiq module CircuitBreaker module Middleware class Client def call(worker_class, msg, queue, redis_pool) begin worker = constantize(worker_class) rescue NameError return yield end circuit_breaker = worker.respond_to?(:sidekiq_circuit_breaker_enabled?) return yield unless circuit_breaker options = worker.sidekiq_circuit_breaker_options scope = extract_scope(options, msg) || worker_class mgr = CircuitBreaker::Manager.new(scope, options) if mgr.open? && msg['at'].nil? msg['at'] = (Time.now + (mgr.time_to_open + additional_seconds)).to_f end yield end private def additional_seconds rand(3..10) end private def extract_scope(options, msg) scope = options.scope return scope if scope.is_a?(String) return unless scope.respond_to?(:call) options.scope.call(*msg['args']) end def constantize(str) names = str.split('::') names.shift if names.empty? || names.first.empty? names.inject(Object) do |constant, name| constant.const_defined?(name, false) ? constant.const_get(name, false) : constant.const_missing(name) end end end class Server def call(worker, msg, queue) circuit_breaker_enabled = worker.class.respond_to?(:sidekiq_circuit_breaker_enabled?) return yield unless circuit_breaker_enabled def worker.perform(*args) manager = sidekiq_circuit_breaker_manager(args) if manager.open? begin Sidekiq::Client.push( 'class' => self.class, 'args' => args ) return end end super(*args) end manager = worker.sidekiq_circuit_breaker_manager(msg['args']) begin yield rescue => e manager.evaluate_failure raise e end manager.register_success end end end end end Sidekiq.configure_client do |config| config.client_middleware do |chain| chain.add Sidekiq::CircuitBreaker::Middleware::Client end end Sidekiq.configure_server do |config| config.client_middleware do |chain| chain.add Sidekiq::CircuitBreaker::Middleware::Client end config.server_middleware do |chain| chain.add Sidekiq::CircuitBreaker::Middleware::Server end end