lib/redis/stream/client.rb in redis-stream-0.4.6 vs lib/redis/stream/client.rb in redis-stream-0.4.7

- old
+ new

@@ -1,7 +1,8 @@ #encoding: UTF-8 require "redis" +require "connection_pool" require "logger" require "json" require "thread" require "redis/stream/inspect" require "redis/stream/config" @@ -44,23 +45,23 @@ @state = Redis::Stream::State::IDLE @stream = stream_name @group = group_name if options.include?('redis') @logger.info("Taking REDIS as a parameter") - @redis = options['redis'] + @redis_pool = ConnectionPool.new(size: 10, timeout: 5) { options['redis'] } elsif Redis::Stream::Config.file_exists? && Redis::Stream::Config.include?(:redis) @logger.info("Taking REDIS from config file") - @redis = Redis.new(Redis::Stream::Config[:redis]) + @redis_pool = ConnectionPool.new(size: 10, timeout: 5) { Redis.new(Redis::Stream::Config[:redis]) } else @logger.info("Instantiating REDIS") - @redis = Redis.new(host: host, port: port, db: db) + @redis_pool = ConnectionPool.new(size: 10, timeout: 5) { Redis.new(host: host, port: port, db: db) } end @consumer_id = "#{@name}-#{@group}-#{Process.pid}" @non_blocking = nil # @send_queue = [] - raise "No redis" if @redis.nil? + raise "No redis" if @redis_pool.nil? || @redis_pool.available == 0 if options.has_key?('tracer') && !options['tracer'].nil? OpenTracing.global_tracer = options["tracer"] elsif Redis::Stream::Config.include?(:zipkin) OpenTracing.global_tracer = Zipkin::Tracer.build(url: Redis::Stream::Config[:zipkin], service_name: "#{@name}-#{@group}", flush_interval: 1) @@ -87,11 +88,14 @@ type = options["type"] to = options["to"] group = options["group"] payload = build_payload(data, options) - add_id = @redis.xadd(@stream, payload) + add_id = nil + @redis_pool.with do |redis| + add_id = redis.xadd(@stream, payload) + end @logger.info("#{@consumer_id} - send to '#{to}' in group '#{group}' with id '#{add_id}' of type '#{type}'") end add_id end @@ -263,11 +267,13 @@ #setup stream def setup_stream if @group begin - @redis.xgroup(:create, @stream, @group, '$', mkstream: true) + @redis_pool.with do |redis| + redis.xgroup(:create, @stream, @group, '$', mkstream: true) + end @logger.info("#{@consumer_id} - Group #{@group} created") rescue Redis::CommandError => e @logger.error("#{@consumer_id} - Group #{@group} exists") @logger.error("#{@consumer_id} - #{e.message}") end @@ -304,16 +310,20 @@ #read message from the stream # @param [Boolean] async return the message if synchronous else call handle_incoming # @param [Boolean] passthrough Receive all messages also the ones intended for other consumers def read_next_message_from_stream(async = true, passthrough = false) if @state == Redis::Stream::State::RUNNING - result = @redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil? - result = @redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group - + result = nil + @redis_pool.with do |redis| + result = redis.xread(@stream, @lastid, block: 1000, count: 1) if @group.nil? + result = redis.xreadgroup(@group, @consumer_id, @stream, '>', block: 1000, count: 1) if @group + end unless result.empty? id, data_out = result[@stream][0] - ack_count = @redis.xack(@stream, @group, id) if @group - + ack_count = 0 + @redis_pool.with do |redis| + ack_count = redis.xack(@stream, @group, id) if @group + end tracer_data = JSON.parse(data_out["tracer"]) unless tracer_data.nil? || tracer_data.empty? #trace_scope.span.log_kv('has_tracer' => true) tracer_span = OpenTracing.extract(OpenTracing::FORMAT_TEXT_MAP, tracer_data) data_out["tracer"] = OpenTracing.start_span(@consumer_id, child_of: tracer_span) \ No newline at end of file