lib/redis/stream/client.rb in redis-stream-0.4.4 vs lib/redis/stream/client.rb in redis-stream-0.4.5
- old
+ new
@@ -217,12 +217,12 @@
OpenTracing.inject(tracer_span.context, OpenTracing::FORMAT_TEXT_MAP, tracer_data)
end
payload = nil
unless @cache.nil?
- if options.include?("cache_key")
- cache_key = @cache.build_key(data)
+ if options.include?("cache_key") && not(options['cache_key'].nil?)
+ cache_key = options["cache_key"] # @cache.build_key(data)
if @cache.include?(cache_key)
if data && data.include?('from_cache') && data['from_cache'].eql?(0)
@cache.delete(cache_key)
@logger.info("#{@consumer_id} - invalidating cache with key #{cache_key}")
else
@@ -235,18 +235,20 @@
tracer: tracer_data.to_json,
payload: @cache[cache_key].to_json
}
@logger.info("#{@consumer_id} - fetching from cache with key #{cache_key}")
end
-
+ else
+ #cache_key = @cache.build_key(data)
+ unless cache_key.nil? || cache_key.empty?
+ @logger.info("#{@consumer_id} - caching with key #{cache_key}")
+ @cache[cache_key] = data
+ end
end
- else
- @logger.info("#{@consumer_id} - fetching from cache with key #{cache_key}")
- @cache[options["cache_key"]] = data
end
-
end
+
if payload.nil?
payload = {
type: type,
from: @consumer_id,
from_group: @group,
@@ -301,56 +303,56 @@
#read message from the stream
# @param [Boolean] async return the message if synchronous else call handle_incoming
# @param [Boolean] passthrough Receive all messages also the ones intended for other consumers
def read_next_message_from_stream(async = true, passthrough = false)
- if @state == Redis::Stream::State::RUNNING
- result = @redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil?
- result = @redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group
+ if @state == Redis::Stream::State::RUNNING
+ result = @redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil?
+ result = @redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group
- unless result.empty?
- id, data_out = result[@stream][0]
- ack_count = @redis.xack(@stream, @group, id) if @group
+ unless result.empty?
+ id, data_out = result[@stream][0]
+ ack_count = @redis.xack(@stream, @group, id) if @group
- tracer_data = JSON.parse(data_out["tracer"])
- unless tracer_data.nil? || tracer_data.empty?
- #trace_scope.span.log_kv('has_tracer' => true)
- tracer_span = OpenTracing.extract(OpenTracing::FORMAT_TEXT_MAP, tracer_data)
- data_out["tracer"] = OpenTracing.start_span(@consumer_id, child_of: tracer_span)
- end
+ tracer_data = JSON.parse(data_out["tracer"])
+ unless tracer_data.nil? || tracer_data.empty?
+ #trace_scope.span.log_kv('has_tracer' => true)
+ tracer_span = OpenTracing.extract(OpenTracing::FORMAT_TEXT_MAP, tracer_data)
+ data_out["tracer"] = OpenTracing.start_span(@consumer_id, child_of: tracer_span)
+ end
- begin
- data_out["payload"] = JSON.parse(data_out["payload"])
- rescue Exception => e
- @logger.error("#{@consumer_id} error parsing payload: #{e.message}")
- end
+ begin
+ data_out["payload"] = JSON.parse(data_out["payload"])
+ rescue Exception => e
+ @logger.error("#{@consumer_id} error parsing payload: #{e.message}")
+ end
- if data_out["from"].eql?(@consumer_id)
- return false
- end
+ if data_out["from"].eql?(@consumer_id)
+ return false
+ end
- unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) &&
- (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group))
- @logger.info("#{@consumer_id} - ignoring message from '#{data_out["from"]}' to '#{data_out["to"]}-#{data_out["to_group"]}'")
+ unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) &&
+ (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group))
+ @logger.info("#{@consumer_id} - ignoring message from '#{data_out["from"]}' to '#{data_out["to"]}-#{data_out["to_group"]}'")
- return false
- end
+ return false
+ end
- @logger.info("#{@consumer_id} - received from '#{data_out["from"]}' of type '#{data_out['type']}' to '#{data_out["to"]}' in group '#{data_out["to_group"]}' with message id '#{id}' - with ack #{ack_count}")
+ @logger.info("#{@consumer_id} - received from '#{data_out["from"]}' of type '#{data_out['type']}' to '#{data_out["to"]}' in group '#{data_out["to_group"]}' with message id '#{id}' - with ack #{ack_count}")
- if data_out["type"].eql?(Redis::Stream::Type::PING)
- add(data_out["payload"].to_s, "to" => data_out["from"], "group" => "*", "type" => Redis::Stream::Type::PONG)
- return false
- end
+ if data_out["type"].eql?(Redis::Stream::Type::PING)
+ add(data_out["payload"].to_s, "to" => data_out["from"], "group" => "*", "type" => Redis::Stream::Type::PONG)
+ return false
+ end
- if data_out["type"].eql?(Redis::Stream::Type::PONG)
- return false
- end
-
- return data_out unless async
- handle_incoming(data_out)
+ if data_out["type"].eql?(Redis::Stream::Type::PONG)
+ return false
end
+
+ return data_out unless async
+ handle_incoming(data_out)
end
+ end
rescue Exception => e
return false
end
end
end
\ No newline at end of file