lib/ldclient-rb/stream.rb in launchdarkly-server-sdk-5.6.2 vs lib/ldclient-rb/stream.rb in launchdarkly-server-sdk-5.7.0

- old
+ new

@@ -22,19 +22,20 @@ SEGMENTS => "/segments/" } # @private class StreamProcessor - def initialize(sdk_key, config, requestor) + def initialize(sdk_key, config, requestor, diagnostic_accumulator = nil) @sdk_key = sdk_key @config = config @feature_store = config.feature_store @requestor = requestor @initialized = Concurrent::AtomicBoolean.new(false) @started = Concurrent::AtomicBoolean.new(false) @stopped = Concurrent::AtomicBoolean.new(false) @ready = Concurrent::Event.new + @connection_attempt_start_time = 0 end def initialized? @initialized.value end @@ -42,22 +43,21 @@ def start return @ready unless @started.make_true @config.logger.info { "[LDClient] Initializing stream connection" } - headers = { - 'Authorization' => @sdk_key, - 'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION - } + headers = Impl::Util.default_http_headers(@sdk_key, @config) opts = { headers: headers, read_timeout: READ_TIMEOUT_SECONDS, logger: @config.logger } + log_connection_started @es = SSE::Client.new(@config.stream_uri + "/all", **opts) do |conn| conn.on_event { |event| process_message(event) } conn.on_error { |err| + log_connection_result(false) case err when SSE::Errors::HTTPStatusError status = err.status message = Util.http_error_message(status, "streaming connection", "will retry") @config.logger.error { "[LDClient] #{message}" } @@ -80,10 +80,11 @@ end private def process_message(message) + log_connection_result(true) method = message.type @config.logger.debug { "[LDClient] Stream received #{method} message: #{message.data}" } if method == PUT message = JSON.parse(message.data, symbolize_names: true) @feature_store.init({ @@ -134,8 +135,20 @@ end end def key_for_path(kind, path) path.start_with?(KEY_PATHS[kind]) ? path[KEY_PATHS[kind].length..-1] : nil + end + + def log_connection_started + @connection_attempt_start_time = Impl::Util::current_time_millis + end + + def log_connection_result(is_success) + if !@diagnostic_accumulator.nil? && @connection_attempt_start_time > 0 + @diagnostic_accumulator.record_stream_init(@connection_attempt_start_time, !is_success, + Impl::Util::current_time_millis - @connection_attempt_start_time) + @connection_attempt_start_time = 0 + end end end end