Sha256: bb1076e9106b8374744c03cfc31eb7f12a1856cbf114f814b0e23080460ec5b9

Contents?: true

Size: 1.1 KB

Versions: 4

Compression:

Stored size: 1.1 KB

Contents

module Sidekiq
  module Grouping
    class Middleware
      def call(worker_class, msg, queue, redis_pool = nil)
        return yield if (defined?(Sidekiq::Testing) && Sidekiq::Testing.inline?)

        worker_class = worker_class.camelize.constantize if worker_class.is_a?(String)
        options = worker_class.get_sidekiq_options

        batch =
          options.key?('batch_flush_size') ||
          options.key?('batch_flush_interval') ||
          options.key?('batch_size')

        passthrough =
          msg['args'] &&
          msg['args'].is_a?(Array) &&
          msg['args'].try(:first) == true

        retrying = msg["failed_at"].present?

        return yield unless batch

        if !(passthrough || retrying)
          add_to_batch(worker_class, queue, msg, redis_pool)
        else
          msg['args'].shift if passthrough
          yield
        end
      end

      private

      def add_to_batch(worker_class, queue, msg, redis_pool = nil)
        Sidekiq::Grouping::Batch
          .new(worker_class.name, queue, redis_pool)
          .add(msg['args'])

        nil
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
sidekiq-grouping-1.1.0 lib/sidekiq/grouping/middleware.rb
sidekiq-grouping-1.0.10 lib/sidekiq/grouping/middleware.rb
sidekiq-grouping-1.0.9 lib/sidekiq/grouping/middleware.rb
sidekiq-grouping-1.0.8 lib/sidekiq/grouping/middleware.rb