require "sidekiq/component"

module Sidekiq
  # A Sidekiq::Capsule is the set of resources necessary to
  # process one or more queues with a given concurrency.
  # One "default" Capsule is started but the user may declare additional
  # Capsules in the initializer.
  #
  # To process a "single" queue with one thread so jobs are processed
  # serially, you can do this:
  #
  # Sidekiq.configure_server do |config|
  #   config.capsule("single-threaded") do |cap|
  #     cap.concurrency = 1
  #     cap.queues = %w(single)
  #   end
  # end
  class Capsule
    include Sidekiq::Component

    attr_reader :name
    attr_reader :queues
    attr_accessor :concurrency

    def initialize(name, config)
      @name = name
      @config = config
      @queues = ["default"]
      @concurrency = config[:concurrency]
    end

    def fetcher
      @fetcher ||= begin
        inst = (config[:fetch_class] || Sidekiq::BasicFetch).new(self)
        inst.setup(config[:fetch_setup]) if inst.respond_to?(:setup)
        inst
      end
    end

    def stop
      fetcher&.bulk_requeue([], nil)
    end

    def queues=(val)
      @queues = Array(val).each_with_object([]) do |qstr, memo|
        arr = qstr
        arr = qstr.split(",") if qstr.is_a?(String)
        name, weight = arr
        [weight.to_i, 1].max.times do
          memo << name
        end
      end
    end

    # Allow the middleware to be different per-capsule.
    # Avoid if possible and add middleware globally so all
    # capsules share the same chains. Easier to debug that way.
    def client_middleware
      @client_chain ||= config.client_middleware.copy_for(self)
      yield @client_chain if block_given?
      @client_chain
    end

    def server_middleware
      @server_chain ||= config.server_middleware.copy_for(self)
      yield @server_chain if block_given?
      @server_chain
    end

    def redis_pool
      Thread.current[:sidekiq_redis_pool] || local_redis_pool
    end

    def local_redis_pool
      # connection pool is lazy, it will not create connections unless you actually need them
      # so don't be skimpy!
      @redis ||= config.new_redis_pool(@concurrency, name)
    end

    def redis
      raise ArgumentError, "requires a block" unless block_given?
      redis_pool.with do |conn|
        retryable = true
        begin
          yield conn
        rescue RedisClientAdapter::BaseError => ex
          # 2550 Failover can cause the server to become a replica, need
          # to disconnect and reopen the socket to get back to the primary.
          # 4495 Use the same logic if we have a "Not enough replicas" error from the primary
          # 4985 Use the same logic when a blocking command is force-unblocked
          # The same retry logic is also used in client.rb
          if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/
            conn.close
            retryable = false
            retry
          end
          raise
        end
      end
    end

    def lookup(name)
      config.lookup(name)
    end

    def logger
      config.logger
    end
  end
end