lib/ldclient-rb/stream.rb in launchdarkly-server-sdk-5.6.2 vs lib/ldclient-rb/stream.rb in launchdarkly-server-sdk-5.7.0
- old
+ new
@@ -22,19 +22,20 @@
SEGMENTS => "/segments/"
}
# @private
class StreamProcessor
- def initialize(sdk_key, config, requestor)
+ def initialize(sdk_key, config, requestor, diagnostic_accumulator = nil)
@sdk_key = sdk_key
@config = config
@feature_store = config.feature_store
@requestor = requestor
@initialized = Concurrent::AtomicBoolean.new(false)
@started = Concurrent::AtomicBoolean.new(false)
@stopped = Concurrent::AtomicBoolean.new(false)
@ready = Concurrent::Event.new
+ @connection_attempt_start_time = 0
end
def initialized?
@initialized.value
end
@@ -42,22 +43,21 @@
def start
return @ready unless @started.make_true
@config.logger.info { "[LDClient] Initializing stream connection" }
- headers = {
- 'Authorization' => @sdk_key,
- 'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION
- }
+ headers = Impl::Util.default_http_headers(@sdk_key, @config)
opts = {
headers: headers,
read_timeout: READ_TIMEOUT_SECONDS,
logger: @config.logger
}
+ log_connection_started
@es = SSE::Client.new(@config.stream_uri + "/all", **opts) do |conn|
conn.on_event { |event| process_message(event) }
conn.on_error { |err|
+ log_connection_result(false)
case err
when SSE::Errors::HTTPStatusError
status = err.status
message = Util.http_error_message(status, "streaming connection", "will retry")
@config.logger.error { "[LDClient] #{message}" }
@@ -80,10 +80,11 @@
end
private
def process_message(message)
+ log_connection_result(true)
method = message.type
@config.logger.debug { "[LDClient] Stream received #{method} message: #{message.data}" }
if method == PUT
message = JSON.parse(message.data, symbolize_names: true)
@feature_store.init({
@@ -134,8 +135,20 @@
end
end
def key_for_path(kind, path)
path.start_with?(KEY_PATHS[kind]) ? path[KEY_PATHS[kind].length..-1] : nil
+ end
+
+ def log_connection_started
+ @connection_attempt_start_time = Impl::Util::current_time_millis
+ end
+
+ def log_connection_result(is_success)
+ if !@diagnostic_accumulator.nil? && @connection_attempt_start_time > 0
+ @diagnostic_accumulator.record_stream_init(@connection_attempt_start_time, !is_success,
+ Impl::Util::current_time_millis - @connection_attempt_start_time)
+ @connection_attempt_start_time = 0
+ end
end
end
end