Sha256: bdb1caf4eb2165dbc62cb46efac79c74b316f553d573a39cb662fea326a7fbdf
Contents?: true
Size: 1020 Bytes
Versions: 4
Compression:
Stored size: 1020 Bytes
Contents
module Sidekiq module Grouping class Middleware def call(worker_class, msg, queue, redis_pool = nil) worker_class = worker_class.classify.constantize if worker_class.is_a?(String) options = worker_class.get_sidekiq_options batch = options.keys.include?('batch_flush_size') || options.keys.include?('batch_flush_interval') passthrough = msg['args'] && msg['args'].is_a?(Array) && msg['args'].try(:first) == true retrying = msg["failed_at"].present? return yield unless batch if !(passthrough || retrying) add_to_batch(worker_class, queue, msg, redis_pool) else msg['args'].shift if passthrough yield end end private def add_to_batch(worker_class, queue, msg, redis_pool = nil) Sidekiq::Grouping::Batch .new(worker_class.name, queue, redis_pool) .add(msg['args']) nil end end end end
Version data entries
4 entries across 4 versions & 1 rubygems