# frozen_string_literal: true require 'redis/connection/ruby' require 'json' module OpenTracing module Instrumentation module Redis # TracingDriverWrapper wrap redis driver class TracingDriverWrapper extend Forwardable class << self DEAFULT_BACKGROUND_DRIVER = ::Redis::Connection::Ruby def connect(options) connection = connect_backround_driver(options) span_builder = options.fetch(:span_builder, SpanBuilder.new) new( connection: connection, span_builder: span_builder, host: options[:host], port: options[:port], ) end private def connect_backround_driver(options) background_driver = options.fetch(:background_driver, DEAFULT_BACKGROUND_DRIVER) background_driver.connect(options) end end attr_reader :connection, :span_builder def initialize( connection:, span_builder:, host:, port: ) @connection = connection @span_builder = span_builder @host = host @port = port end def_delegators :connection, :connected?, :disconnect, :timeout= def write(command) scope = start_pipeline_scope(command) connection.write(command).tap do span_builder.write_log_command(scope.span, command) if scope end rescue StandardError => e span_builder.write_error(scope.span, e, event: EVENT_WRITE) if scope close_pipeline_scope raise end def read scope = pipeline_scope connection.read.tap do |reply| span_builder.write_log_reply(scope.span, reply) if scope end rescue StandardError => e span_builder.write_error(scope.span, e, event: EVENT_READ) if scope raise ensure close_pipeline_scope end private attr_reader :pipeline_scope, :error_writer def peer_addr "#{@host}:#{@port}" end def start_pipeline_scope(command) if @pipeline_scope @pipeline_queue_size += 1 return @pipeline_scope end new_pipeline_scope(command) end def new_pipeline_scope(command) @pipeline_queue_size = 1 @pipeline_scope = span_builder.start_active_scope( command, connection.class, peer_addr, ) end def close_pipeline_scope return unless @pipeline_scope @pipeline_queue_size -= 1 return if @pipeline_queue_size.positive? @pipeline_scope.close @pipeline_scope = nil end end end end end