lib/redis/stream/client.rb in redis-stream-0.4.2 vs lib/redis/stream/client.rb in redis-stream-0.4.3

- old
+ new

@@ -78,22 +78,23 @@ # @param [Hash{String->String}] Options # # no passthrough variable here. The passthrough is available in the start method def add(data = {}, options = {}) raise "Client isn't running" unless @state.eql?(Redis::Stream::State::RUNNING) + add_id = nil + OpenTracing.start_active_span('add') do |scope| + default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "cache_key" => nil, "tracer" => nil} + options = default_options.merge(options) - default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "cache_key" => nil, "tracer" => nil} - options = default_options.merge(options) + type = options["type"] + to = options["to"] + group = options["group"] + payload = build_payload(data, options) + add_id = @redis.xadd(@stream, payload) - type = options["type"] - to = options["to"] - group = options["group"] - payload = build_payload(data, options) - add_id = @redis.xadd(@stream, payload) - # @send_queue << add_id - - @logger.info("#{@consumer_id} - send to '#{to}' in group '#{group}' with id '#{add_id}' of type '#{type}'") + @logger.info("#{@consumer_id} - send to '#{to}' in group '#{group}' with id '#{add_id}' of type '#{type}'") + end add_id end # sync_add: same as add command but synchronous. Blocks call until a message arrives # @param [Object] data Any data you want to transmit @@ -200,13 +201,10 @@ yield scope if block_given? end end end - def trace_error(operation_name, span = nil) - trace(operation_name, span) - end private def build_payload(data, options) to = options['to'] group = options['group'] @@ -302,64 +300,56 @@ #read message from the stream # @param [Boolean] async return the message if synchronous else call handle_incoming # @param [Boolean] passthrough Receive all messages also the ones intended for other consumers def read_next_message_from_stream(async = true, passthrough = false) - if @state == Redis::Stream::State::RUNNING - result = @redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil? - result = @redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group + if @state == Redis::Stream::State::RUNNING + result = @redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil? + result = @redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group - unless result.empty? - id, data_out = result[@stream][0] - ack_count = @redis.xack(@stream, @group, id) if @group + unless result.empty? + id, data_out = result[@stream][0] + ack_count = @redis.xack(@stream, @group, id) if @group - tracer_data = JSON.parse(data_out["tracer"]) - unless tracer_data.nil? || tracer_data.empty? - tracer_span = OpenTracing.extract(OpenTracing::FORMAT_TEXT_MAP, tracer_data) - data_out["tracer"] = OpenTracing.start_active_span(@consumer_id, child_of: tracer_span) - end + tracer_data = JSON.parse(data_out["tracer"]) + unless tracer_data.nil? || tracer_data.empty? + #trace_scope.span.log_kv('has_tracer' => true) + tracer_span = OpenTracing.extract(OpenTracing::FORMAT_TEXT_MAP, tracer_data) + data_out["tracer"] = OpenTracing.start_span(@consumer_id, child_of: tracer_span) + end - begin - data_out["payload"] = JSON.parse(data_out["payload"]) - rescue Exception => e - @logger.error("#{@consumer_id} error parsing payload: #{e.message}") - end + begin + data_out["payload"] = JSON.parse(data_out["payload"]) + rescue Exception => e + @logger.error("#{@consumer_id} error parsing payload: #{e.message}") + end - # if @send_queue.include?(id) - # @send_queue.delete(id) - # @logger.warning("#{@consumer_id} - send queue is not empty: #{@send_queue.join(',')}") if @send_queue.length > 0 - # unless passthrough - # #@logger.info("#{@consumer_id} - ignoring self") - # return false - # end - # end + if data_out["from"].eql?(@consumer_id) + return false + end - if data_out["from"].eql?(@consumer_id) - return false - end + unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) && + (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group)) + @logger.info("#{@consumer_id} - ignoring message from '#{data_out["from"]}' to '#{data_out["to"]}-#{data_out["to_group"]}'") - unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) && - (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group)) - @logger.info("#{@consumer_id} - ignoring message from '#{data_out["from"]}' to '#{data_out["to"]}-#{data_out["to_group"]}'") + return false + end - return false - end + @logger.info("#{@consumer_id} - received from '#{data_out["from"]}' of type '#{data_out['type']}' to '#{data_out["to"]}' in group '#{data_out["to_group"]}' with message id '#{id}' - with ack #{ack_count}") - @logger.info("#{@consumer_id} - received from '#{data_out["from"]}' of type '#{data_out['type']}' to '#{data_out["to"]}' in group '#{data_out["to_group"]}' with message id '#{id}' - with ack #{ack_count}") + if data_out["type"].eql?(Redis::Stream::Type::PING) + add(data_out["payload"].to_s, "to" => data_out["from"], "group" => "*", "type" => Redis::Stream::Type::PONG) + return false + end - if data_out["type"].eql?(Redis::Stream::Type::PING) - add(data_out["payload"].to_s, "to" => data_out["from"], "group" => "*", "type" => Redis::Stream::Type::PONG) - return false - end + if data_out["type"].eql?(Redis::Stream::Type::PONG) + return false + end - if data_out["type"].eql?(Redis::Stream::Type::PONG) - return false + return data_out unless async + handle_incoming(data_out) end - - return data_out unless async - handle_incoming(data_out) end - end rescue Exception => e return false end end end \ No newline at end of file