lib/redis/stream/client.rb in redis-stream-0.3.0 vs lib/redis/stream/client.rb in redis-stream-0.4.0
- old
+ new
@@ -4,37 +4,44 @@
require "json"
require "thread"
require "redis/stream/inspect"
require "redis/stream/config"
require "redis/stream/data_cache"
+require "redis/stream/tracer/zipkin_tracer"
class Redis
module Stream
class Client
include Redis::Stream::Inspect
- attr_reader :logger, :name, :group, :consumer_id, :cache, :redis, :non_blocking
+ attr_reader :logger, :name, :group, :stream, :consumer_id, :cache, :redis, :non_blocking
# Initialize: setup rstream
# @param [String] stream_name name of the rstream
- # @param [String] group name of the rstream group
+ # @param [String] group_name name of the rstream group
+ # @param [String] name name of the Stream Client
# @param [Object] options options can contain redis[host, port, db] and logger keys
#
# Example: Redis::Stream::Client.new("resolver", "stream", {"logger" => Logger.new(STDOUT)})
# if group is nil or not supplied then no rstream group will be setup
def initialize(stream_name, group_name = nil, name = rand(36 ** 7).to_s(36), options = {})
- default_options = {"host" => "127.0.0.1", "port" => 6379, "db" => 0, "config_file_path" => '.', "logger" => Logger.new(STDOUT)}
+ default_options = {"host" => "127.0.0.1",
+ "port" => 6379,
+ "db" => 0,
+ "config_file_path" => '.',
+ "logger" => Logger.new(STDOUT),
+ "zipkin_config" => {}}
options = default_options.merge(options)
Redis::Stream::Config.path = options['config_file_path']
host = options["host"]
port = options["port"]
db = options["db"]
@logger = options["logger"]
@cache = options.include?('caching') && options['caching'] ? Redis::Stream::DataCache.new : nil
-
+ @tracer = ZipkinTracer::RedisStreamHandler.new(self, options["zipkin_config"])
@name = name
@state = Redis::Stream::State::IDLE
@stream = stream_name
@group = group_name
if options.include?('redis')
@@ -58,16 +65,22 @@
@last_id = info['last-generated-id'] rescue '0'
@logger.info "#{@consumer_id} - Last ID = #{@last_id}"
end
+ def trace(topic, span = nil, &block)
+ @tracer.trace(topic, span, &block)
+ end
+ def trace_error(msg, span, &block)
+ @tracer.trace_error(msg, span, &block)
+ end
+
+
# add: add a message to the stream
# @param [Object] data Any data you want to transmit
- # @param [String] to Name of the consumer can be "*" or "" or nil for any consumer
- # @param [String] group Name of the consumer group can be "*" or "" or nil for any group
- # @param [Stream::Type] type Type of message
+ # @param [Hash{String->String}] Options
#
# no passthrough variable here. The passthrough is available in the start method
def add(data = {}, options = {})
raise "Client isn't running" unless @state.eql?(Redis::Stream::State::RUNNING)
@@ -85,14 +98,10 @@
add_id
end
# sync_add: same as add command but synchronous. Blocks call until a message arrives
# @param [Object] data Any data you want to transmit
- # @param [String] to Name of the consumer can be "*" or "" or nil for any consumer
- # @param [String] group Name of the consumer group can be "*" or "" or nil for any group
- # @param [Stream::Type] type Type of message
- # @param [Integer] time_out Time out after x seconds
- # @param [Boolean] passthrough Receive all messages also the ones intended for other consumers
+ # @param [Hash{String->String}] Options
def sync_add(data = {}, options = {})
raise "Client isn't running" unless @state.eql?(Redis::Stream::State::RUNNING)
default_options = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "time_out" => 5, "passthrough" => false, "cache_key" => nil}
options = default_options.merge(options)
\ No newline at end of file