lib/redis/stream/client.rb in redis-stream-0.4.8 vs lib/redis/stream/client.rb in redis-stream-0.4.9
- old
+ new
@@ -315,10 +315,11 @@
result = nil
@redis_pool.with do |redis|
result = redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil?
result = redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group
end
+
unless result.empty?
id, data_out = result[@stream][0]
ack_count = 0
@redis_pool.with do |redis|
ack_count = redis.xack(@stream, @group, id) if @group
@@ -338,25 +339,27 @@
if data_out["from"].eql?(@consumer_id)
return false
end
- unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) &&
- (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group))
- @logger.info("#{@consumer_id} - ignoring message from '#{data_out["from"]}' to '#{data_out["to"]}-#{data_out["to_group"]}'")
+ unless passthrough
+ unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) &&
+ (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group))
+ @logger.info("#{@consumer_id} - ignoring message from '#{data_out["from"]}' to '#{data_out["to"]}-#{data_out["to_group"]}'")
- return false
+ return false
+ end
end
@logger.info("#{@consumer_id} - received from '#{data_out["from"]}' of type '#{data_out['type']}' to '#{data_out["to"]}' in group '#{data_out["to_group"]}' with message id '#{id}' - with ack #{ack_count}")
if data_out["type"].eql?(Redis::Stream::Type::PING)
add(data_out["payload"].to_s, "to" => data_out["from"], "group" => "*", "type" => Redis::Stream::Type::PONG)
return false
end
if data_out["type"].eql?(Redis::Stream::Type::PONG)
- return false
+ return true
end
return data_out unless async
handle_incoming(data_out)
end
\ No newline at end of file