lib/redis/stream/client.rb in redis-stream-0.4.4 vs lib/redis/stream/client.rb in redis-stream-0.4.5

- old
+ new

@@ -217,12 +217,12 @@ OpenTracing.inject(tracer_span.context, OpenTracing::FORMAT_TEXT_MAP, tracer_data) end payload = nil unless @cache.nil? - if options.include?("cache_key") - cache_key = @cache.build_key(data) + if options.include?("cache_key") && not(options['cache_key'].nil?) + cache_key = options["cache_key"] # @cache.build_key(data) if @cache.include?(cache_key) if data && data.include?('from_cache') && data['from_cache'].eql?(0) @cache.delete(cache_key) @logger.info("#{@consumer_id} - invalidating cache with key #{cache_key}") else @@ -235,18 +235,20 @@ tracer: tracer_data.to_json, payload: @cache[cache_key].to_json } @logger.info("#{@consumer_id} - fetching from cache with key #{cache_key}") end - + else + #cache_key = @cache.build_key(data) + unless cache_key.nil? || cache_key.empty? + @logger.info("#{@consumer_id} - caching with key #{cache_key}") + @cache[cache_key] = data + end end - else - @logger.info("#{@consumer_id} - fetching from cache with key #{cache_key}") - @cache[options["cache_key"]] = data end - end + if payload.nil? payload = { type: type, from: @consumer_id, from_group: @group, @@ -301,56 +303,56 @@ #read message from the stream # @param [Boolean] async return the message if synchronous else call handle_incoming # @param [Boolean] passthrough Receive all messages also the ones intended for other consumers def read_next_message_from_stream(async = true, passthrough = false) - if @state == Redis::Stream::State::RUNNING - result = @redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil? - result = @redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group + if @state == Redis::Stream::State::RUNNING + result = @redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil? + result = @redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group - unless result.empty? - id, data_out = result[@stream][0] - ack_count = @redis.xack(@stream, @group, id) if @group + unless result.empty? + id, data_out = result[@stream][0] + ack_count = @redis.xack(@stream, @group, id) if @group - tracer_data = JSON.parse(data_out["tracer"]) - unless tracer_data.nil? || tracer_data.empty? - #trace_scope.span.log_kv('has_tracer' => true) - tracer_span = OpenTracing.extract(OpenTracing::FORMAT_TEXT_MAP, tracer_data) - data_out["tracer"] = OpenTracing.start_span(@consumer_id, child_of: tracer_span) - end + tracer_data = JSON.parse(data_out["tracer"]) + unless tracer_data.nil? || tracer_data.empty? + #trace_scope.span.log_kv('has_tracer' => true) + tracer_span = OpenTracing.extract(OpenTracing::FORMAT_TEXT_MAP, tracer_data) + data_out["tracer"] = OpenTracing.start_span(@consumer_id, child_of: tracer_span) + end - begin - data_out["payload"] = JSON.parse(data_out["payload"]) - rescue Exception => e - @logger.error("#{@consumer_id} error parsing payload: #{e.message}") - end + begin + data_out["payload"] = JSON.parse(data_out["payload"]) + rescue Exception => e + @logger.error("#{@consumer_id} error parsing payload: #{e.message}") + end - if data_out["from"].eql?(@consumer_id) - return false - end + if data_out["from"].eql?(@consumer_id) + return false + end - unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) && - (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group)) - @logger.info("#{@consumer_id} - ignoring message from '#{data_out["from"]}' to '#{data_out["to"]}-#{data_out["to_group"]}'") + unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) && + (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group)) + @logger.info("#{@consumer_id} - ignoring message from '#{data_out["from"]}' to '#{data_out["to"]}-#{data_out["to_group"]}'") - return false - end + return false + end - @logger.info("#{@consumer_id} - received from '#{data_out["from"]}' of type '#{data_out['type']}' to '#{data_out["to"]}' in group '#{data_out["to_group"]}' with message id '#{id}' - with ack #{ack_count}") + @logger.info("#{@consumer_id} - received from '#{data_out["from"]}' of type '#{data_out['type']}' to '#{data_out["to"]}' in group '#{data_out["to_group"]}' with message id '#{id}' - with ack #{ack_count}") - if data_out["type"].eql?(Redis::Stream::Type::PING) - add(data_out["payload"].to_s, "to" => data_out["from"], "group" => "*", "type" => Redis::Stream::Type::PONG) - return false - end + if data_out["type"].eql?(Redis::Stream::Type::PING) + add(data_out["payload"].to_s, "to" => data_out["from"], "group" => "*", "type" => Redis::Stream::Type::PONG) + return false + end - if data_out["type"].eql?(Redis::Stream::Type::PONG) - return false - end - - return data_out unless async - handle_incoming(data_out) + if data_out["type"].eql?(Redis::Stream::Type::PONG) + return false end + + return data_out unless async + handle_incoming(data_out) end + end rescue Exception => e return false end end end \ No newline at end of file