lib/redis/stream/client.rb in redis-stream-0.4.8 vs lib/redis/stream/client.rb in redis-stream-0.4.9

- old
+ new

@@ -315,10 +315,11 @@ result = nil @redis_pool.with do |redis| result = redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil? result = redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group end + unless result.empty? id, data_out = result[@stream][0] ack_count = 0 @redis_pool.with do |redis| ack_count = redis.xack(@stream, @group, id) if @group @@ -338,25 +339,27 @@ if data_out["from"].eql?(@consumer_id) return false end - unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) && - (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group)) - @logger.info("#{@consumer_id} - ignoring message from '#{data_out["from"]}' to '#{data_out["to"]}-#{data_out["to_group"]}'") + unless passthrough + unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) && + (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group)) + @logger.info("#{@consumer_id} - ignoring message from '#{data_out["from"]}' to '#{data_out["to"]}-#{data_out["to_group"]}'") - return false + return false + end end @logger.info("#{@consumer_id} - received from '#{data_out["from"]}' of type '#{data_out['type']}' to '#{data_out["to"]}' in group '#{data_out["to_group"]}' with message id '#{id}' - with ack #{ack_count}") if data_out["type"].eql?(Redis::Stream::Type::PING) add(data_out["payload"].to_s, "to" => data_out["from"], "group" => "*", "type" => Redis::Stream::Type::PONG) return false end if data_out["type"].eql?(Redis::Stream::Type::PONG) - return false + return true end return data_out unless async handle_incoming(data_out) end \ No newline at end of file