Sha256: bdb1caf4eb2165dbc62cb46efac79c74b316f553d573a39cb662fea326a7fbdf

Contents?: true

Size: 1020 Bytes

Versions: 4

Compression:

Stored size: 1020 Bytes

Contents

module Sidekiq
  module Grouping
    class Middleware
      def call(worker_class, msg, queue, redis_pool = nil)
        worker_class = worker_class.classify.constantize if worker_class.is_a?(String)
        options = worker_class.get_sidekiq_options

        batch =
          options.keys.include?('batch_flush_size') ||
          options.keys.include?('batch_flush_interval')

        passthrough =
          msg['args'] &&
          msg['args'].is_a?(Array) &&
          msg['args'].try(:first) == true

        retrying = msg["failed_at"].present?

        return yield unless batch

        if !(passthrough || retrying)
          add_to_batch(worker_class, queue, msg, redis_pool)
        else
          msg['args'].shift if passthrough
          yield
        end
      end

      private

      def add_to_batch(worker_class, queue, msg, redis_pool = nil)
        Sidekiq::Grouping::Batch
          .new(worker_class.name, queue, redis_pool)
          .add(msg['args'])

        nil
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
sidekiq-grouping-1.0.6 lib/sidekiq/grouping/middleware.rb
sidekiq-grouping-1.0.4 lib/sidekiq/grouping/middleware.rb
sidekiq-grouping-1.0.3 lib/sidekiq/grouping/middleware.rb
sidekiq-grouping-1.0.2 lib/sidekiq/grouping/middleware.rb