lib/redis/stream/client.rb in redis-stream-0.3.0 vs lib/redis/stream/client.rb in redis-stream-0.4.0

- old
+ new

@@ -4,37 +4,44 @@ require "json" require "thread" require "redis/stream/inspect" require "redis/stream/config" require "redis/stream/data_cache" +require "redis/stream/tracer/zipkin_tracer" class Redis module Stream class Client include Redis::Stream::Inspect - attr_reader :logger, :name, :group, :consumer_id, :cache, :redis, :non_blocking + attr_reader :logger, :name, :group, :stream, :consumer_id, :cache, :redis, :non_blocking # Initialize: setup rstream # @param [String] stream_name name of the rstream - # @param [String] group name of the rstream group + # @param [String] group_name name of the rstream group + # @param [String] name name of the Stream Client # @param [Object] options options can contain redis[host, port, db] and logger keys # # Example: Redis::Stream::Client.new("resolver", "stream", {"logger" => Logger.new(STDOUT)}) # if group is nil or not supplied then no rstream group will be setup def initialize(stream_name, group_name = nil, name = rand(36 ** 7).to_s(36), options = {}) - default_options = {"host" => "127.0.0.1", "port" => 6379, "db" => 0, "config_file_path" => '.', "logger" => Logger.new(STDOUT)} + default_options = {"host" => "127.0.0.1", + "port" => 6379, + "db" => 0, + "config_file_path" => '.', + "logger" => Logger.new(STDOUT), + "zipkin_config" => {}} options = default_options.merge(options) Redis::Stream::Config.path = options['config_file_path'] host = options["host"] port = options["port"] db = options["db"] @logger = options["logger"] @cache = options.include?('caching') && options['caching'] ? Redis::Stream::DataCache.new : nil - + @tracer = ZipkinTracer::RedisStreamHandler.new(self, options["zipkin_config"]) @name = name @state = Redis::Stream::State::IDLE @stream = stream_name @group = group_name if options.include?('redis') @@ -58,16 +65,22 @@ @last_id = info['last-generated-id'] rescue '0' @logger.info "#{@consumer_id} - Last ID = #{@last_id}" end + def trace(topic, span = nil, &block) + @tracer.trace(topic, span, &block) + end + def trace_error(msg, span, &block) + @tracer.trace_error(msg, span, &block) + end + + # add: add a message to the stream # @param [Object] data Any data you want to transmit - # @param [String] to Name of the consumer can be "*" or "" or nil for any consumer - # @param [String] group Name of the consumer group can be "*" or "" or nil for any group - # @param [Stream::Type] type Type of message + # @param [Hash{String->String}] Options # # no passthrough variable here. The passthrough is available in the start method def add(data = {}, options = {}) raise "Client isn't running" unless @state.eql?(Redis::Stream::State::RUNNING) @@ -85,14 +98,10 @@ add_id end # sync_add: same as add command but synchronous. Blocks call until a message arrives # @param [Object] data Any data you want to transmit - # @param [String] to Name of the consumer can be "*" or "" or nil for any consumer - # @param [String] group Name of the consumer group can be "*" or "" or nil for any group - # @param [Stream::Type] type Type of message - # @param [Integer] time_out Time out after x seconds - # @param [Boolean] passthrough Receive all messages also the ones intended for other consumers + # @param [Hash{String->String}] Options def sync_add(data = {}, options = {}) raise "Client isn't running" unless @state.eql?(Redis::Stream::State::RUNNING) default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "time_out" => 5, "passthrough" => false, "cache_key" => nil} options = default_options.merge(options) \ No newline at end of file