Sha256: cb45f033430337699c1b927e5ec6534b3e565e293927e969073e69a7c00e027b

Contents?: true

Size: 737 Bytes

Versions: 1

Compression:

Stored size: 737 Bytes

Contents

require "metacrunch/redis"

module Metacrunch
  class Redis::QueueSource

    DEFAULT_OPTIONS = {
      blocking_mode: false
    }

    def initialize(redis, queue_name, options = {})
      @redis = redis
      @queue_name = queue_name
      @options = DEFAULT_OPTIONS.merge(options)
    end

    def each(&block)
      return enum_for(__method__) unless block_given?

      if @options[:blocking_mode]
        while true
          list, result = @redis.blpop(@queue_name, timeout: 0)
          if result.present?
            yield result
          else
            yield nil
          end
        end
      else
        while result = @redis.lpop(@queue_name)
          yield result
        end
      end

      self
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
metacrunch-redis-1.1.0 lib/metacrunch/redis/queue_source.rb