lib/redis/stream/client.rb in redis-stream-0.4.1 vs lib/redis/stream/client.rb in redis-stream-0.4.2

- old
+ new

@@ -4,11 +4,11 @@ require "json" require "thread" require "redis/stream/inspect" require "redis/stream/config" require "redis/stream/data_cache" -require "redis/stream/tracer/zipkin_tracer" +require "zipkin/tracer" class Redis module Stream class Client include Redis::Stream::Inspect @@ -27,21 +27,21 @@ default_options = {"host" => "127.0.0.1", "port" => 6379, "db" => 0, "config_file_path" => '.', "logger" => Logger.new(STDOUT), - "zipkin_config" => {}} + "tracer" => nil + } options = default_options.merge(options) Redis::Stream::Config.path = options['config_file_path'] host = options["host"] port = options["port"] db = options["db"] @logger = options["logger"] @cache = options.include?('caching') && options['caching'] ? Redis::Stream::DataCache.new : nil - @tracer = ZipkinTracer::RedisStreamHandler.new(self, options["zipkin_config"]) @name = name @state = Redis::Stream::State::IDLE @stream = stream_name @group = group_name if options.include?('redis') @@ -58,35 +58,32 @@ @non_blocking = nil # @send_queue = [] raise "No redis" if @redis.nil? + if options.has_key?('tracer') && !options['tracer'].nil? + OpenTracing.global_tracer = options["tracer"] + elsif Redis::Stream::Config.include?(:zipkin) + OpenTracing.global_tracer = Zipkin::Tracer.build(url: Redis::Stream::Config[:zipkin], service_name: "#{@name}-#{@group}", flush_interval: 1) + end + @state = Redis::Stream::State::RUNNING if options.include?("sync_start") && options["sync_start"] setup_stream @last_id = info['last-generated-id'] rescue '0' @logger.info "#{@consumer_id} - Last ID = #{@last_id}" end - def trace(topic, span = nil, &block) - @tracer.trace(topic, span, &block) - end - - def trace_error(msg, span, &block) - @tracer.trace_error(msg, span, &block) - end - - # add: add a message to the stream # @param [Object] data Any data you want to transmit # @param [Hash{String->String}] Options # # no passthrough variable here. The passthrough is available in the start method def add(data = {}, options = {}) raise "Client isn't running" unless @state.eql?(Redis::Stream::State::RUNNING) - default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "cache_key" => nil} + default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "cache_key" => nil, "tracer" => nil} options = default_options.merge(options) type = options["type"] to = options["to"] group = options["group"] @@ -102,21 +99,21 @@ # @param [Object] data Any data you want to transmit # @param [Hash{String->String}] Options def sync_add(data = {}, options = {}) raise "Client isn't running" unless @state.eql?(Redis::Stream::State::RUNNING) - default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "time_out" => 5, "passthrough" => false, "cache_key" => nil} + default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "time_out" => 5, "passthrough" => false, "cache_key" => nil, "tracer" => nil} options = default_options.merge(options) to = options["to"] group = options["group"] passthrough = options["passthrough"] time_out = options["time_out"] #@state = Redis::Stream::State::RUNNING data_out = nil - add_id = add(data, "to" => to, "group" => group, "type" => options["type"], "cache_key" => options["cache_key"]) + add_id = add(data, "to" => to, "group" => group, "type" => options["type"], "cache_key" => options["cache_key"], "tracer" => options['tracer']) time = Time.now loop do timing = ((Time.now - time)).to_i @@ -186,18 +183,43 @@ end end end end + def trace(operation_name, parent_scope = nil) + # span = OpenTracing.start_span(operation_name) + # yield span if block_given? + # span.finish + # active_scope = OpenTracing.scope_manager.active + if parent_scope.is_a?(Zipkin::Scope) + OpenTracing.start_active_span(operation_name, child_of: parent_scope.span) do |scope| + yield scope if block_given? + end + else + OpenTracing.start_active_span(operation_name) do |scope| + yield scope if block_given? + end + end + end + + def trace_error(operation_name, span = nil) + trace(operation_name, span) + end private def build_payload(data, options) to = options['to'] group = options['group'] type = options['type'] - + tracer_data = {} + if options['tracer'].nil? + tracer_data = nil + else + tracer_span = options['tracer'] + OpenTracing.inject(tracer_span.context, OpenTracing::FORMAT_TEXT_MAP, tracer_data) + end payload = nil unless @cache.nil? if options["cache_key"].nil? cache_key = @cache.build_key(data) @@ -210,10 +232,11 @@ type: type, from: to, from_group: group, to: @consumer_id, to_group: @group, + tracer: tracer_data.to_json, payload: @cache[cache_key].to_json } @logger.info("#{@consumer_id} - fetching from cache with key #{cache_key}") end @@ -228,10 +251,11 @@ type: type, from: @consumer_id, from_group: @group, to: to, to_group: group, + tracer: tracer_data.to_json, payload: data.to_json } end payload end @@ -286,10 +310,16 @@ unless result.empty? id, data_out = result[@stream][0] ack_count = @redis.xack(@stream, @group, id) if @group + tracer_data = JSON.parse(data_out["tracer"]) + unless tracer_data.nil? || tracer_data.empty? + tracer_span = OpenTracing.extract(OpenTracing::FORMAT_TEXT_MAP, tracer_data) + data_out["tracer"] = OpenTracing.start_active_span(@consumer_id, child_of: tracer_span) + end + begin data_out["payload"] = JSON.parse(data_out["payload"]) rescue Exception => e @logger.error("#{@consumer_id} error parsing payload: #{e.message}") end @@ -301,10 +331,10 @@ # #@logger.info("#{@consumer_id} - ignoring self") # return false # end # end - if (data_out["from"].eql?(@consumer_id)) + if data_out["from"].eql?(@consumer_id) return false end unless (data_out["to"].nil? || data_out["to"].eql?('') || data_out["to"].eql?('*') || data_out["to"].eql?(@consumer_id)) && (data_out["to_group"].nil? || data_out["to_group"].eql?('') || data_out["to_group"].eql?('*') || data_out["to_group"].eql?(@group)) \ No newline at end of file