lib/redis/stream/client.rb in redis-stream-0.4.2 vs lib/redis/stream/client.rb in redis-stream-0.4.3
- old
+ new
@@ -78,22 +78,23 @@
# @param [Hash{String->String}] Options
#
# no passthrough variable here. The passthrough is available in the start method
def add(data = {}, options = {})
raise "Client isn't running" unless @state.eql?(Redis::Stream::State::RUNNING)
+ add_id = nil
+ OpenTracing.start_active_span('add') do |scope|
+ default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "cache_key" => nil, "tracer" => nil}
+ options = default_options.merge(options)
- default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "cache_key" => nil, "tracer" => nil}
- options = default_options.merge(options)
+ type = options["type"]
+ to = options["to"]
+ group = options["group"]
+ payload = build_payload(data, options)
+ add_id = @redis.xadd(@stream, payload)
- type = options["type"]
- to = options["to"]
- group = options["group"]
- payload = build_payload(data, options)
- add_id = @redis.xadd(@stream, payload)
- # @send_queue << add_id
-
- @logger.info("#{@consumer_id} - send to '#{to}' in group '#{group}' with id '#{add_id}' of type '#{type}'")
+ @logger.info("#{@consumer_id} - send to '#{to}' in group '#{group}' with id '#{add_id}' of type '#{type}'")
+ end
add_id
end
# sync_add: same as add command but synchronous. Blocks call until a message arrives
# @param [Object] data Any data you want to transmit
@@ -200,13 +201,10 @@
yield scope if block_given?
end
end
end
- def trace_error(operation_name, span = nil)
- trace(operation_name, span)
- end
private
def build_payload(data, options)
to = options['to']
group = options['group']
@@ -302,64 +300,56 @@
#read message from the stream
# @param [Boolean] async return the message if synchronous else call handle_incoming
# @param [Boolean] passthrough Receive all messages also the ones intended for other consumers
def read_next_message_from_stream(async = true, passthrough = false)
- if @state == Redis::Stream::State::RUNNING
- result = @redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil?
- result = @redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group
+ if @state == Redis::Stream::State::RUNNING
+ result = @redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil?
+ result = @redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group
- unless result.empty?
- id, data_out = result[@stream][0]
- ack_count = @redis.xack(@stream, @group, id) if @group
+ unless result.empty?
+ id, data_out = result[@stream][0]
+ ack_count = @redis.xack(@stream, @group, id) if @group
- tracer_data = JSON.parse(data_out["tracer"])
- unless tracer_data.nil? || tracer_data.empty?
- tracer_span = OpenTracing.extract(OpenTracing::FORMAT_TEXT_MAP, tracer_data)
- data_out["tracer"] = OpenTracing.start_active_span(@consumer_id, child_of: tracer_span)
- end
+ tracer_data = JSON.parse(data_out["tracer"])
+ unless tracer_data.nil? || tracer_data.empty?
+ #trace_scope.span.log_kv('has_tracer' => true)
+ tracer_span = OpenTracing.extract(OpenTracing::FORMAT_TEXT_MAP, tracer_data)
+ data_out["tracer"] = OpenTracing.start_span(@consumer_id, child_of: tracer_span)
+ end
- begin
- data_out["payload"] = JSON.parse(data_out["payload"])
- rescue Exception => e
- @logger.error("#{@consumer_id} error parsing payload: #{e.message}")
- end
+ begin
+ data_out["payload"] = JSON.parse(data_out["payload"])
+ rescue Exception => e
+ @logger.error("#{@consumer_id} error parsing payload: #{e.message}")
+ end
- # if @send_queue.include?(id)
- # @send_queue.delete(id)
- # @logger.warning("#{@consumer_id} - send queue is not empty: #{@send_queue.join(',')}") if @send_queue.length > 0
- # unless passthrough
- # #@logger.info("#{@consumer_id} - ignoring self")
- # return false
- # end
- # end
+ if data_out["from"].eql?(@consumer_id)
+ return false
+ end
- if data_out["from"].eql?(@consumer_id)
- return false
- end
+ unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) &&
+ (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group))
+ @logger.info("#{@consumer_id} - ignoring message from '#{data_out["from"]}' to '#{data_out["to"]}-#{data_out["to_group"]}'")
- unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) &&
- (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group))
- @logger.info("#{@consumer_id} - ignoring message from '#{data_out["from"]}' to '#{data_out["to"]}-#{data_out["to_group"]}'")
+ return false
+ end
- return false
- end
+ @logger.info("#{@consumer_id} - received from '#{data_out["from"]}' of type '#{data_out['type']}' to '#{data_out["to"]}' in group '#{data_out["to_group"]}' with message id '#{id}' - with ack #{ack_count}")
- @logger.info("#{@consumer_id} - received from '#{data_out["from"]}' of type '#{data_out['type']}' to '#{data_out["to"]}' in group '#{data_out["to_group"]}' with message id '#{id}' - with ack #{ack_count}")
+ if data_out["type"].eql?(Redis::Stream::Type::PING)
+ add(data_out["payload"].to_s, "to" => data_out["from"], "group" => "*", "type" => Redis::Stream::Type::PONG)
+ return false
+ end
- if data_out["type"].eql?(Redis::Stream::Type::PING)
- add(data_out["payload"].to_s, "to" => data_out["from"], "group" => "*", "type" => Redis::Stream::Type::PONG)
- return false
- end
+ if data_out["type"].eql?(Redis::Stream::Type::PONG)
+ return false
+ end
- if data_out["type"].eql?(Redis::Stream::Type::PONG)
- return false
+ return data_out unless async
+ handle_incoming(data_out)
end
-
- return data_out unless async
- handle_incoming(data_out)
end
- end
rescue Exception => e
return false
end
end
end
\ No newline at end of file