lib/sidekiq/grouping/batch.rb in sidekiq-grouping-1.1.0 vs lib/sidekiq/grouping/batch.rb in sidekiq-grouping-1.2.0
- old
+ new
@@ -1,56 +1,65 @@
+# frozen_string_literal: true
+
module Sidekiq
module Grouping
class Batch
- def initialize(worker_class, queue, redis_pool = nil)
+ def initialize(worker_class, queue, _redis_pool = nil)
@worker_class = worker_class
@queue = queue
@name = "#{worker_class.underscore}:#{queue}"
@redis = Sidekiq::Grouping::Redis.new
end
attr_reader :name, :worker_class, :queue
def add(msg)
msg = msg.to_json
- @redis.push_msg(@name, msg, enqueue_similar_once?) if should_add? msg
+ return unless should_add? msg
+
+ @redis.push_msg(
+ @name,
+ msg,
+ remember_unique: enqueue_similar_once?
+ )
end
- def should_add? msg
+ def should_add?(msg)
return true unless enqueue_similar_once?
+
!@redis.enqueued?(@name, msg)
end
def size
@redis.batch_size(@name)
end
def chunk_size
- worker_class_options['batch_size'] ||
+ worker_class_options["batch_size"] ||
Sidekiq::Grouping::Config.max_batch_size
end
def pluck_size
- worker_class_options['batch_flush_size'] ||
+ worker_class_options["batch_flush_size"] ||
chunk_size
end
def pluck
- if @redis.lock(@name)
- @redis.pluck(@name, pluck_size).map { |value| JSON.parse(value) }
- end
+ return unless @redis.lock(@name)
+
+ @redis.pluck(@name, pluck_size).map { |value| JSON.parse(value) }
end
def flush
chunk = pluck
return unless chunk
chunk.each_slice(chunk_size) do |subchunk|
Sidekiq::Client.push(
- 'class' => @worker_class,
- 'queue' => @queue,
- 'args' => [true, subchunk]
+ "class" => @worker_class,
+ "queue" => @queue,
+ "args" => [true, subchunk]
)
end
set_current_time_as_last
end
@@ -72,14 +81,15 @@
last_time = @redis.get_last_execution_time(@name)
Time.parse(last_time) if last_time
end
def next_execution_time
- if interval = worker_class_options['batch_flush_interval']
- last_time = last_execution_time
- last_time + interval.seconds if last_time
- end
+ interval = worker_class_options["batch_flush_interval"]
+ return unless interval
+
+ last_time = last_execution_time
+ last_time + interval.seconds if last_time
end
def delete
@redis.delete(@name)
end
@@ -97,17 +107,17 @@
next_time = next_execution_time
if last_time.blank?
set_current_time_as_last
false
- else
- next_time < Time.now if next_time
+ elsif next_time
+ next_time < Time.now
end
end
def enqueue_similar_once?
- worker_class_options['batch_unique'] == true
+ worker_class_options["batch_unique"] == true
end
def set_current_time_as_last
@redis.set_last_execution_time(@name, Time.now)
end
@@ -120,10 +130,10 @@
new(*extract_worker_klass_and_queue(name))
end
end
def extract_worker_klass_and_queue(name)
- klass, queue = name.split(':')
+ klass, queue = name.split(":")
[klass.camelize, queue]
end
end
end
end