lib/sidekiq/grouping/batch.rb in sidekiq-grouping-1.1.0 vs lib/sidekiq/grouping/batch.rb in sidekiq-grouping-1.2.0

- old
+ new

@@ -1,56 +1,65 @@ +# frozen_string_literal: true + module Sidekiq module Grouping class Batch - def initialize(worker_class, queue, redis_pool = nil) + def initialize(worker_class, queue, _redis_pool = nil) @worker_class = worker_class @queue = queue @name = "#{worker_class.underscore}:#{queue}" @redis = Sidekiq::Grouping::Redis.new end attr_reader :name, :worker_class, :queue def add(msg) msg = msg.to_json - @redis.push_msg(@name, msg, enqueue_similar_once?) if should_add? msg + return unless should_add? msg + + @redis.push_msg( + @name, + msg, + remember_unique: enqueue_similar_once? + ) end - def should_add? msg + def should_add?(msg) return true unless enqueue_similar_once? + !@redis.enqueued?(@name, msg) end def size @redis.batch_size(@name) end def chunk_size - worker_class_options['batch_size'] || + worker_class_options["batch_size"] || Sidekiq::Grouping::Config.max_batch_size end def pluck_size - worker_class_options['batch_flush_size'] || + worker_class_options["batch_flush_size"] || chunk_size end def pluck - if @redis.lock(@name) - @redis.pluck(@name, pluck_size).map { |value| JSON.parse(value) } - end + return unless @redis.lock(@name) + + @redis.pluck(@name, pluck_size).map { |value| JSON.parse(value) } end def flush chunk = pluck return unless chunk chunk.each_slice(chunk_size) do |subchunk| Sidekiq::Client.push( - 'class' => @worker_class, - 'queue' => @queue, - 'args' => [true, subchunk] + "class" => @worker_class, + "queue" => @queue, + "args" => [true, subchunk] ) end set_current_time_as_last end @@ -72,14 +81,15 @@ last_time = @redis.get_last_execution_time(@name) Time.parse(last_time) if last_time end def next_execution_time - if interval = worker_class_options['batch_flush_interval'] - last_time = last_execution_time - last_time + interval.seconds if last_time - end + interval = worker_class_options["batch_flush_interval"] + return unless interval + + last_time = last_execution_time + last_time + interval.seconds if last_time end def delete @redis.delete(@name) end @@ -97,17 +107,17 @@ next_time = next_execution_time if last_time.blank? set_current_time_as_last false - else - next_time < Time.now if next_time + elsif next_time + next_time < Time.now end end def enqueue_similar_once? - worker_class_options['batch_unique'] == true + worker_class_options["batch_unique"] == true end def set_current_time_as_last @redis.set_last_execution_time(@name, Time.now) end @@ -120,10 +130,10 @@ new(*extract_worker_klass_and_queue(name)) end end def extract_worker_klass_and_queue(name) - klass, queue = name.split(':') + klass, queue = name.split(":") [klass.camelize, queue] end end end end