lib/redis/stream/client.rb in redis-stream-0.4.6 vs lib/redis/stream/client.rb in redis-stream-0.4.7
- old
+ new
@@ -1,7 +1,8 @@
#encoding: UTF-8
require "redis"
+require "connection_pool"
require "logger"
require "json"
require "thread"
require "redis/stream/inspect"
require "redis/stream/config"
@@ -44,23 +45,23 @@
@state = Redis::Stream::State::IDLE
@stream = stream_name
@group = group_name
if options.include?('redis')
@logger.info("Taking REDIS as a parameter")
- @redis = options['redis']
+ @redis_pool = ConnectionPool.new(size: 10, timeout: 5) { options['redis'] }
elsif Redis::Stream::Config.file_exists? && Redis::Stream::Config.include?(:redis)
@logger.info("Taking REDIS from config file")
- @redis = Redis.new(Redis::Stream::Config[:redis])
+ @redis_pool = ConnectionPool.new(size: 10, timeout: 5) { Redis.new(Redis::Stream::Config[:redis]) }
else
@logger.info("Instantiating REDIS")
- @redis = Redis.new(host: host, port: port, db: db)
+ @redis_pool = ConnectionPool.new(size: 10, timeout: 5) { Redis.new(host: host, port: port, db: db) }
end
@consumer_id = "#{@name}-#{@group}-#{Process.pid}"
@non_blocking = nil
# @send_queue = []
- raise "No redis" if @redis.nil?
+ raise "No redis" if @redis_pool.nil? || @redis_pool.available == 0
if options.has_key?('tracer') && !options['tracer'].nil?
OpenTracing.global_tracer = options["tracer"]
elsif Redis::Stream::Config.include?(:zipkin)
OpenTracing.global_tracer = Zipkin::Tracer.build(url: Redis::Stream::Config[:zipkin], service_name: "#{@name}-#{@group}", flush_interval: 1)
@@ -87,11 +88,14 @@
type = options["type"]
to = options["to"]
group = options["group"]
payload = build_payload(data, options)
- add_id = @redis.xadd(@stream, payload)
+ add_id = nil
+ @redis_pool.with do |redis|
+ add_id = redis.xadd(@stream, payload)
+ end
@logger.info("#{@consumer_id} - send to '#{to}' in group '#{group}' with id '#{add_id}' of type '#{type}'")
end
add_id
end
@@ -263,11 +267,13 @@
#setup stream
def setup_stream
if @group
begin
- @redis.xgroup(:create, @stream, @group, '$', mkstream: true)
+ @redis_pool.with do |redis|
+ redis.xgroup(:create, @stream, @group, '$', mkstream: true)
+ end
@logger.info("#{@consumer_id} - Group #{@group} created")
rescue Redis::CommandError => e
@logger.error("#{@consumer_id} - Group #{@group} exists")
@logger.error("#{@consumer_id} - #{e.message}")
end
@@ -304,16 +310,20 @@
#read message from the stream
# @param [Boolean] async return the message if synchronous else call handle_incoming
# @param [Boolean] passthrough Receive all messages also the ones intended for other consumers
def read_next_message_from_stream(async = true, passthrough = false)
if @state == Redis::Stream::State::RUNNING
- result = @redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil?
- result = @redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group
-
+ result = nil
+ @redis_pool.with do |redis|
+ result = redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil?
+ result = redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group
+ end
unless result.empty?
id, data_out = result[@stream][0]
- ack_count = @redis.xack(@stream, @group, id) if @group
-
+ ack_count = 0
+ @redis_pool.with do |redis|
+ ack_count = redis.xack(@stream, @group, id) if @group
+ end
tracer_data = JSON.parse(data_out["tracer"])
unless tracer_data.nil? || tracer_data.empty?
#trace_scope.span.log_kv('has_tracer' => true)
tracer_span = OpenTracing.extract(OpenTracing::FORMAT_TEXT_MAP, tracer_data)
data_out["tracer"] = OpenTracing.start_span(@consumer_id, child_of: tracer_span)
\ No newline at end of file