require_relative "redis" require "active_support/core_ext/hash/indifferent_access" class Sidekiq::Merger::Merge class << self def all redis = Sidekiq::Merger::Redis.new redis.all_merges.map { |full_merge_key| initialize_with_full_merge_key(full_merge_key, redis: redis) } end def initialize_with_full_merge_key(full_merge_key, options = {}) keys = full_merge_key.split(":") raise "Invalid merge key" if keys.size < 3 worker_class = keys[0].camelize.constantize queue = keys[1] merge_key = keys[2] new(worker_class, queue, merge_key, options) end def initialize_with_args(worker_class, queue, args, options = {}) new(worker_class, queue, merge_key(worker_class, args), options) end def merge_key(worker_class, args) options = get_options(worker_class) merge_key = options["key"] if merge_key.respond_to?(:call) merge_key = merge_key.call(args) end merge_key = "" if merge_key.nil? merge_key = merge_key.to_json unless merge_key.is_a?(String) merge_key end def get_options(worker_class) (worker_class.get_sidekiq_options["merger"] || {}).with_indifferent_access end end attr_reader :worker_class, :queue, :merge_key def initialize(worker_class, queue, merge_key, redis: Sidekiq::Merger::Redis.new) @worker_class = worker_class @queue = queue @merge_key = merge_key @redis = redis end def add(args, execution_time) if !options[:unique] || !@redis.merge_exists?(full_merge_key, args) @redis.push_message(full_merge_key, args, execution_time) end end def delete(args) @redis.delete_message(full_merge_key, args) end def delete_all @redis.delete_merge(full_merge_key) end def size @redis.merge_size(full_merge_key) end def flush msgs = [] if @redis.lock_merge(full_merge_key, Sidekiq::Merger::Config.lock_ttl) msgs = @redis.pluck_merge(full_merge_key) end unless msgs.empty? batches = options[:batch_size].nil? ? [msgs] : msgs.each_slice(options[:batch_size].to_i).to_a batches.each do |batch_msgs| # preserve FIFO when enqueuing batches Sidekiq::Client.push( "class" => worker_class, "queue" => queue, "args" => batch_msgs, "merged" => true ) end end end def can_flush? !execution_time.nil? && execution_time < Time.now end def full_merge_key @full_merge_key ||= [worker_class.name.to_s.underscore, queue, merge_key].join(":") end def all_args @redis.get_merge(full_merge_key) end def execution_time @execution_time ||= @redis.merge_execution_time(full_merge_key) end def ==(other) self.worker_class == other.worker_class && self.queue == other.queue && self.merge_key == other.merge_key end private def options @options ||= self.class.get_options(worker_class) rescue NameError {} end end