Sha256: be6cfc055ccfd50106c1b9f1f61d1507df551c0314b3a7de5ddb8add51f24905

Contents?: true

Size: 863 Bytes

Versions: 2

Compression:

Stored size: 863 Bytes

Contents

require "metacrunch/redis"

module Metacrunch
  class Redis::QueueReader

    def initialize(redis_connection_or_url, queue_name, options = {})
      @queue_name = queue_name
      raise ArgumentError, "queue_name must be a string" unless queue_name.is_a?(String)

      @blocking_mode = options.delete(:blocking) || false

      @redis = if redis_connection_or_url.is_a?(String)
        ::Redis.new(url: redis_connection_or_url)
      else
        redis_connection_or_url
      end
    end

    def each(&block)
      return enum_for(__method__) unless block_given?

      if @blocking_mode
        while true
          result = @redis.blpop(@queue_name)
          yield JSON.parse(result[1]) if result
        end
      else
        while result = @redis.lpop(@queue_name)
          yield JSON.parse(result)
        end
      end

      self
    end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
metacrunch-3.1.1 lib/metacrunch/redis/queue_reader.rb
metacrunch-3.1.0 lib/metacrunch/redis/queue_reader.rb