lib/redis/stream/client.rb in redis-stream-0.4.1 vs lib/redis/stream/client.rb in redis-stream-0.4.2
- old
+ new
@@ -4,11 +4,11 @@
require "json"
require "thread"
require "redis/stream/inspect"
require "redis/stream/config"
require "redis/stream/data_cache"
-require "redis/stream/tracer/zipkin_tracer"
+require "zipkin/tracer"
class Redis
module Stream
class Client
include Redis::Stream::Inspect
@@ -27,21 +27,21 @@
default_options = {"host" => "127.0.0.1",
"port" => 6379,
"db" => 0,
"config_file_path" => '.',
"logger" => Logger.new(STDOUT),
- "zipkin_config" => {}}
+ "tracer" => nil
+ }
options = default_options.merge(options)
Redis::Stream::Config.path = options['config_file_path']
host = options["host"]
port = options["port"]
db = options["db"]
@logger = options["logger"]
@cache = options.include?('caching') && options['caching'] ? Redis::Stream::DataCache.new : nil
- @tracer = ZipkinTracer::RedisStreamHandler.new(self, options["zipkin_config"])
@name = name
@state = Redis::Stream::State::IDLE
@stream = stream_name
@group = group_name
if options.include?('redis')
@@ -58,35 +58,32 @@
@non_blocking = nil
# @send_queue = []
raise "No redis" if @redis.nil?
+ if options.has_key?('tracer') && !options['tracer'].nil?
+ OpenTracing.global_tracer = options["tracer"]
+ elsif Redis::Stream::Config.include?(:zipkin)
+ OpenTracing.global_tracer = Zipkin::Tracer.build(url: Redis::Stream::Config[:zipkin], service_name: "#{@name}-#{@group}", flush_interval: 1)
+ end
+
@state = Redis::Stream::State::RUNNING if options.include?("sync_start") && options["sync_start"]
setup_stream
@last_id = info['last-generated-id'] rescue '0'
@logger.info "#{@consumer_id} - Last ID = #{@last_id}"
end
- def trace(topic, span = nil, &block)
- @tracer.trace(topic, span, &block)
- end
-
- def trace_error(msg, span, &block)
- @tracer.trace_error(msg, span, &block)
- end
-
-
# add: add a message to the stream
# @param [Object] data Any data you want to transmit
# @param [Hash{String->String}] Options
#
# no passthrough variable here. The passthrough is available in the start method
def add(data = {}, options = {})
raise "Client isn't running" unless @state.eql?(Redis::Stream::State::RUNNING)
- default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "cache_key" => nil}
+ default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "cache_key" => nil, "tracer" => nil}
options = default_options.merge(options)
type = options["type"]
to = options["to"]
group = options["group"]
@@ -102,21 +99,21 @@
# @param [Object] data Any data you want to transmit
# @param [Hash{String->String}] Options
def sync_add(data = {}, options = {})
raise "Client isn't running" unless @state.eql?(Redis::Stream::State::RUNNING)
- default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "time_out" => 5, "passthrough" => false, "cache_key" => nil}
+ default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "time_out" => 5, "passthrough" => false, "cache_key" => nil, "tracer" => nil}
options = default_options.merge(options)
to = options["to"]
group = options["group"]
passthrough = options["passthrough"]
time_out = options["time_out"]
#@state = Redis::Stream::State::RUNNING
data_out = nil
- add_id = add(data, "to" => to, "group" => group, "type" => options["type"], "cache_key" => options["cache_key"])
+ add_id = add(data, "to" => to, "group" => group, "type" => options["type"], "cache_key" => options["cache_key"], "tracer" => options['tracer'])
time = Time.now
loop do
timing = ((Time.now - time)).to_i
@@ -186,18 +183,43 @@
end
end
end
end
+ def trace(operation_name, parent_scope = nil)
+ # span = OpenTracing.start_span(operation_name)
+ # yield span if block_given?
+ # span.finish
+ # active_scope = OpenTracing.scope_manager.active
+ if parent_scope.is_a?(Zipkin::Scope)
+ OpenTracing.start_active_span(operation_name, child_of: parent_scope.span) do |scope|
+ yield scope if block_given?
+ end
+ else
+ OpenTracing.start_active_span(operation_name) do |scope|
+ yield scope if block_given?
+ end
+ end
+ end
+
+ def trace_error(operation_name, span = nil)
+ trace(operation_name, span)
+ end
private
def build_payload(data, options)
to = options['to']
group = options['group']
type = options['type']
-
+ tracer_data = {}
+ if options['tracer'].nil?
+ tracer_data = nil
+ else
+ tracer_span = options['tracer']
+ OpenTracing.inject(tracer_span.context, OpenTracing::FORMAT_TEXT_MAP, tracer_data)
+ end
payload = nil
unless @cache.nil?
if options["cache_key"].nil?
cache_key = @cache.build_key(data)
@@ -210,10 +232,11 @@
type: type,
from: to,
from_group: group,
to: @consumer_id,
to_group: @group,
+ tracer: tracer_data.to_json,
payload: @cache[cache_key].to_json
}
@logger.info("#{@consumer_id} - fetching from cache with key #{cache_key}")
end
@@ -228,10 +251,11 @@
type: type,
from: @consumer_id,
from_group: @group,
to: to,
to_group: group,
+ tracer: tracer_data.to_json,
payload: data.to_json
}
end
payload
end
@@ -286,10 +310,16 @@
unless result.empty?
id, data_out = result[@stream][0]
ack_count = @redis.xack(@stream, @group, id) if @group
+ tracer_data = JSON.parse(data_out["tracer"])
+ unless tracer_data.nil? || tracer_data.empty?
+ tracer_span = OpenTracing.extract(OpenTracing::FORMAT_TEXT_MAP, tracer_data)
+ data_out["tracer"] = OpenTracing.start_active_span(@consumer_id, child_of: tracer_span)
+ end
+
begin
data_out["payload"] = JSON.parse(data_out["payload"])
rescue Exception => e
@logger.error("#{@consumer_id} error parsing payload: #{e.message}")
end
@@ -301,10 +331,10 @@
# #@logger.info("#{@consumer_id} - ignoring self")
# return false
# end
# end
- if (data_out["from"].eql?(@consumer_id))
+ if data_out["from"].eql?(@consumer_id)
return false
end
unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) &&
(data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group))
\ No newline at end of file