Sha256: b1b95300e5694b90430fb8b539cc9c0257bbbba02e52c9d9590f3e2af7ed7367

Contents?: true

Size: 1.95 KB

Versions: 3

Compression:

Stored size: 1.95 KB

Contents

module Sidekiq
  module Grouping
    class Redis

      PLUCK_SCRIPT = <<-SCRIPT
        local pluck_values = redis.call('lrange', KEYS[1], 0, ARGV[1] - 1)
        redis.call('ltrim', KEYS[1], ARGV[1], -1)
        for k, v in pairs(pluck_values) do
          redis.call('srem', KEYS[2], v)
        end
        return pluck_values
      SCRIPT

      def push_msg(name, msg, remember_unique = false)
        redis do |conn|
          conn.multi do
            conn.sadd(ns('batches'), name)
            conn.rpush(ns(name), msg)
            conn.sadd(unique_messages_key(name), msg) if remember_unique
          end
        end
      end

      def enqueued?(name, msg)
        redis do |conn|
          conn.sismember(unique_messages_key(name), msg)
        end
      end

      def batch_size(name)
        redis { |conn| conn.llen(ns(name)) }
      end

      def batches
        redis { |conn| conn.smembers(ns('batches')) }
      end

      def pluck(name, limit)
        keys = [ns(name), unique_messages_key(name)]
        args = [limit]
        redis { |conn| conn.eval PLUCK_SCRIPT, keys, args }
      end

      def get_last_execution_time(name)
        redis { |conn| conn.get(ns("last_execution_time:#{name}")) }
      end

      def set_last_execution_time(name, time)
        redis { |conn| conn.set(ns("last_execution_time:#{name}"), time.to_json) }
      end

      def lock(name)
        redis do |conn|
          id = ns("lock:#{name}")
          conn.set(id, true, nx: true, ex: Sidekiq::Grouping::Config.lock_ttl)
        end
      end

      def delete(name)
        redis do |conn|
          conn.del(ns("last_execution_time:#{name}"))
          conn.del(ns(name))
          conn.srem(ns('batches'), name)
        end
      end

      private

      def unique_messages_key name
        ns("#{name}:unique_messages")
      end

      def ns(key = nil)
        "batching:#{key}"
      end

      def redis(&block)
        Sidekiq.redis(&block)
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
sidekiq-grouping-1.0.10 lib/sidekiq/grouping/redis.rb
sidekiq-grouping-1.0.9 lib/sidekiq/grouping/redis.rb
sidekiq-grouping-1.0.8 lib/sidekiq/grouping/redis.rb