Sha256: 562aa80ad2044f0a8a08442925ade9800d6576fabdc8f9897ec7c9ff8f437f4d
Contents?: true
Size: 1.06 KB
Versions: 1
Compression:
Stored size: 1.06 KB
Contents
require "metacrunch/redis" module Metacrunch class Redis::QueueReader include Metacrunch::ParallelProcessableReader def initialize(redis_connection_or_url, queue_name, options = {}) @queue_name = queue_name raise ArgumentError, "queue_name must be a string" unless queue_name.is_a?(String) @blocking_mode = options.delete(:blocking) || false @blocking_timeout = options.delete(:blocking_timeout) || 0 @redis = if redis_connection_or_url.is_a?(String) ::Redis.new(url: redis_connection_or_url) else redis_connection_or_url end end def each(&block) return enum_for(__method__) unless block_given? if @blocking_mode while true list, result = @redis.blpop(@queue_name, timeout: @blocking_timeout) if result.present? yield JSON.parse(result) else yield nil end end else while result = @redis.lpop(@queue_name) yield JSON.parse(result) end end self end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
metacrunch-3.1.4 | lib/metacrunch/redis/queue_reader.rb |