lib/ldclient-rb/events.rb in launchdarkly-server-sdk-6.4.0 vs lib/ldclient-rb/events.rb in launchdarkly-server-sdk-7.0.0

- old
+ new

@@ -1,5 +1,6 @@ +require "ldclient-rb/impl/context_filter" require "ldclient-rb/impl/diagnostic_events" require "ldclient-rb/impl/event_sender" require "ldclient-rb/impl/event_summarizer" require "ldclient-rb/impl/event_types" require "ldclient-rb/impl/util" @@ -18,21 +19,21 @@ # the event in a bounded queue, the "inbox", and immediately returns. # # On a separate worker thread, EventDispatcher consumes events from the inbox. These are considered # "input events" because they may or may not actually be sent to LaunchDarkly; most flag evaluation # events are not sent, but are counted and the counters become part of a single summary event. -# EventDispatcher updates those counters, creates "index" events for any users that have not been seen +# EventDispatcher updates those counters, creates "index" events for any contexts that have not been seen # recently, and places any events that will be sent to LaunchDarkly into the "outbox" queue. # # When it is time to flush events to LaunchDarkly, the contents of the outbox are handed off to # another worker thread which sends the HTTP request. # module LaunchDarkly module EventProcessorMethods def record_eval_event( - user, + context, key, version = nil, variation = nil, value = nil, reason = nil, @@ -41,37 +42,30 @@ debug_until = nil, prereq_of = nil ) end - def record_identify_event(user) + def record_identify_event(context) end def record_custom_event( - user, + context, key, data = nil, metric_value = nil ) end - def record_alias_event(user, previous_user) - end - def flush end def stop end end MAX_FLUSH_WORKERS = 5 - USER_ATTRS_TO_STRINGIFY_FOR_EVENTS = [ :key, :secondary, :ip, :country, :email, :firstName, :lastName, - :avatar, :name ] - private_constant :MAX_FLUSH_WORKERS - private_constant :USER_ATTRS_TO_STRINGIFY_FOR_EVENTS # @private class NullEventProcessor include EventProcessorMethods end @@ -79,11 +73,11 @@ # @private class FlushMessage end # @private - class FlushUsersMessage + class FlushContextsMessage end # @private class DiagnosticEventMessage end @@ -91,11 +85,11 @@ # @private class SynchronousMessage def initialize @reply = Concurrent::Semaphore.new(0) end - + def completed @reply.release end def wait_for_completion @@ -121,14 +115,14 @@ @inbox = SizedQueue.new(config.capacity < 100 ? 100 : config.capacity) @flush_task = Concurrent::TimerTask.new(execution_interval: config.flush_interval) do post_to_inbox(FlushMessage.new) end @flush_task.execute - @users_flush_task = Concurrent::TimerTask.new(execution_interval: config.user_keys_flush_interval) do - post_to_inbox(FlushUsersMessage.new) + @contexts_flush_task = Concurrent::TimerTask.new(execution_interval: config.context_keys_flush_interval) do + post_to_inbox(FlushContextsMessage.new) end - @users_flush_task.execute + @contexts_flush_task.execute if !diagnostic_accumulator.nil? interval = test_properties && test_properties.has_key?(:diagnostic_recording_interval) ? test_properties[:diagnostic_recording_interval] : config.diagnostic_recording_interval @diagnostic_event_task = Concurrent::TimerTask.new(execution_interval: interval) do @@ -140,62 +134,52 @@ end @stopped = Concurrent::AtomicBoolean.new(false) @inbox_full = Concurrent::AtomicBoolean.new(false) event_sender = (test_properties || {})[:event_sender] || - Impl::EventSender.new(sdk_key, config, client ? client : Util.new_http_client(config.events_uri, config)) + Impl::EventSender.new(sdk_key, config, client || Util.new_http_client(config.events_uri, config)) @timestamp_fn = (test_properties || {})[:timestamp_fn] || proc { Impl::Util.current_time_millis } EventDispatcher.new(@inbox, sdk_key, config, diagnostic_accumulator, event_sender) end def record_eval_event( - user, + context, key, version = nil, variation = nil, value = nil, reason = nil, default = nil, track_events = false, debug_until = nil, prereq_of = nil ) - post_to_inbox(LaunchDarkly::Impl::EvalEvent.new(timestamp, user, key, version, variation, value, reason, + post_to_inbox(LaunchDarkly::Impl::EvalEvent.new(timestamp, context, key, version, variation, value, reason, default, track_events, debug_until, prereq_of)) end - def record_identify_event(user) - post_to_inbox(LaunchDarkly::Impl::IdentifyEvent.new(timestamp, user)) + def record_identify_event(context) + post_to_inbox(LaunchDarkly::Impl::IdentifyEvent.new(timestamp, context)) end - def record_custom_event(user, key, data = nil, metric_value = nil) - post_to_inbox(LaunchDarkly::Impl::CustomEvent.new(timestamp, user, key, data, metric_value)) + def record_custom_event(context, key, data = nil, metric_value = nil) + post_to_inbox(LaunchDarkly::Impl::CustomEvent.new(timestamp, context, key, data, metric_value)) end - def record_alias_event(user, previous_user) - post_to_inbox(LaunchDarkly::Impl::AliasEvent.new( - timestamp, - user.nil? ? nil : user[:key], - user_to_context_kind(user), - previous_user.nil? ? nil : previous_user[:key], - user_to_context_kind(previous_user) - )) - end - def flush # flush is done asynchronously post_to_inbox(FlushMessage.new) end def stop # final shutdown, which includes a final flush, is done synchronously if @stopped.make_true @flush_task.shutdown - @users_flush_task.shutdown - @diagnostic_event_task.shutdown if !@diagnostic_event_task.nil? + @contexts_flush_task.shutdown + @diagnostic_event_task.shutdown unless @diagnostic_event_task.nil? # Note that here we are not calling post_to_inbox, because we *do* want to wait if the inbox # is full; an orderly shutdown can't happen unless these messages are received. @inbox << FlushMessage.new stop_msg = StopMessage.new @inbox << stop_msg @@ -225,31 +209,27 @@ if @inbox_full.make_true @logger.warn { "[LDClient] Events are being produced faster than they can be processed; some events will be dropped" } end end end - - private def user_to_context_kind(user) - (user.nil? || !user[:anonymous]) ? 'user' : 'anonymousUser' - end end # @private class EventDispatcher def initialize(inbox, sdk_key, config, diagnostic_accumulator, event_sender) @sdk_key = sdk_key @config = config @diagnostic_accumulator = config.diagnostic_opt_out? ? nil : diagnostic_accumulator @event_sender = event_sender - @user_keys = SimpleLRUCacheSet.new(config.user_keys_capacity) + @context_keys = SimpleLRUCacheSet.new(config.context_keys_capacity) @formatter = EventOutputFormatter.new(config) @disabled = Concurrent::AtomicBoolean.new(false) @last_known_past_time = Concurrent::AtomicReference.new(0) - @deduplicated_users = 0 + @deduplicated_contexts = 0 @events_in_last_batch = 0 - + outbox = EventBuffer.new(config.capacity, config.logger) flush_workers = NonBlockingThreadPool.new(MAX_FLUSH_WORKERS) if !@diagnostic_accumulator.nil? diagnostic_event_workers = NonBlockingThreadPool.new(1) @@ -270,12 +250,12 @@ begin message = inbox.pop case message when FlushMessage trigger_flush(outbox, flush_workers) - when FlushUsersMessage - @user_keys.clear + when FlushContextsMessage + @context_keys.clear when DiagnosticEventMessage send_and_reset_diagnostics(outbox, diagnostic_event_workers) when TestSyncMessage synchronize_for_testing(flush_workers, diagnostic_event_workers) message.completed @@ -293,21 +273,21 @@ end def do_shutdown(flush_workers, diagnostic_event_workers) flush_workers.shutdown flush_workers.wait_for_termination - if !diagnostic_event_workers.nil? + unless diagnostic_event_workers.nil? diagnostic_event_workers.shutdown diagnostic_event_workers.wait_for_termination end @event_sender.stop if @event_sender.respond_to?(:stop) end def synchronize_for_testing(flush_workers, diagnostic_event_workers) # Used only by unit tests. Wait until all active flush workers have finished. flush_workers.wait_all - diagnostic_event_workers.wait_all if !diagnostic_event_workers.nil? + diagnostic_event_workers.wait_all unless diagnostic_event_workers.nil? end def dispatch_event(event, outbox) return if @disabled.value @@ -325,31 +305,30 @@ end else will_add_full_event = true end - # For each user we haven't seen before, we add an index event - unless this is already - # an identify event for that user. - if !(will_add_full_event && @config.inline_users_in_events) - if !event.user.nil? && !notice_user(event.user) && !event.is_a?(LaunchDarkly::Impl::IdentifyEvent) - outbox.add_event(LaunchDarkly::Impl::IndexEvent.new(event.timestamp, event.user)) - end + # For each context we haven't seen before, we add an index event - unless this is already + # an identify event for that context. + if !event.context.nil? && !notice_context(event.context) && !event.is_a?(LaunchDarkly::Impl::IdentifyEvent) + outbox.add_event(LaunchDarkly::Impl::IndexEvent.new(event.timestamp, event.context)) end outbox.add_event(event) if will_add_full_event - outbox.add_event(debug_event) if !debug_event.nil? + outbox.add_event(debug_event) unless debug_event.nil? end - # Add to the set of users we've noticed, and return true if the user was already known to us. - def notice_user(user) - if user.nil? || !user.has_key?(:key) - true - else - known = @user_keys.add(user[:key].to_s) - @deduplicated_users += 1 if known - known - end + # + # Add to the set of contexts we've noticed, and return true if the context + # was already known to us. + # @param context [LaunchDarkly::LDContext] + # @return [Boolean] + # + def notice_context(context) + known = @context_keys.add(context.fully_qualified_key) + @deduplicated_contexts += 1 if known + known end def should_debug_event(event) debug_until = event.debug_until if !debug_until.nil? @@ -363,21 +342,21 @@ def trigger_flush(outbox, flush_workers) if @disabled.value return end - payload = outbox.get_payload + payload = outbox.get_payload if !payload.events.empty? || !payload.summary.counters.empty? count = payload.events.length + (payload.summary.counters.empty? ? 0 : 1) @events_in_last_batch = count # If all available worker threads are busy, success will be false and no job will be queued. success = flush_workers.post do begin events_out = @formatter.make_output_events(payload.events, payload.summary) result = @event_sender.send_event_data(events_out.to_json, "#{events_out.length} events", false) @disabled.value = true if result.must_shutdown - if !result.time_from_server.nil? + unless result.time_from_server.nil? @last_known_past_time.value = (result.time_from_server.to_f * 1000).to_i end rescue => e Util.log_exception(@config.logger, "Unexpected error in event processor", e) end @@ -389,12 +368,12 @@ end def send_and_reset_diagnostics(outbox, diagnostic_event_workers) return if @diagnostic_accumulator.nil? dropped_count = outbox.get_and_clear_dropped_count - event = @diagnostic_accumulator.create_periodic_event_and_reset(dropped_count, @deduplicated_users, @events_in_last_batch) - @deduplicated_users = 0 + event = @diagnostic_accumulator.create_periodic_event_and_reset(dropped_count, @deduplicated_contexts, @events_in_last_batch) + @deduplicated_contexts = 0 @events_in_last_batch = 0 send_diagnostic_event(event, diagnostic_event_workers) end def send_diagnostic_event(event, diagnostic_event_workers) @@ -428,11 +407,11 @@ if @events.length < @capacity @events.push(event) @capacity_exceeded = false else @dropped_events += 1 - if !@capacity_exceeded + unless @capacity_exceeded @capacity_exceeded = true @logger.warn { "[LDClient] Exceeded event queue capacity. Increase capacity to avoid dropping events." } end end end @@ -440,11 +419,11 @@ def add_to_summary(event) @summarizer.summarize_event(event) end def get_payload - return FlushPayload.new(@events, @summarizer.snapshot) + FlushPayload.new(@events, @summarizer.snapshot) end def get_and_clear_dropped_count ret = @dropped_events @dropped_events = 0 @@ -460,101 +439,85 @@ # @private class EventOutputFormatter FEATURE_KIND = 'feature' IDENTIFY_KIND = 'identify' CUSTOM_KIND = 'custom' - ALIAS_KIND = 'alias' INDEX_KIND = 'index' DEBUG_KIND = 'debug' SUMMARY_KIND = 'summary' - ANONYMOUS_USER_CONTEXT_KIND = 'anonymousUser' def initialize(config) - @inline_users = config.inline_users_in_events - @user_filter = UserFilter.new(config) + @context_filter = LaunchDarkly::Impl::ContextFilter.new(config.all_attributes_private, config.private_attributes) end # Transforms events into the format used for event sending. def make_output_events(events, summary) events_out = events.map { |e| make_output_event(e) } - if !summary.counters.empty? + unless summary.counters.empty? events_out.push(make_summary_event(summary)) end events_out end private def make_output_event(event) case event - + when LaunchDarkly::Impl::EvalEvent out = { kind: FEATURE_KIND, creationDate: event.timestamp, key: event.key, - value: event.value + value: event.value, } - out[:default] = event.default if !event.default.nil? - out[:variation] = event.variation if !event.variation.nil? - out[:version] = event.version if !event.version.nil? - out[:prereqOf] = event.prereq_of if !event.prereq_of.nil? - set_opt_context_kind(out, event.user) - set_user_or_user_key(out, event.user) - out[:reason] = event.reason if !event.reason.nil? + out[:default] = event.default unless event.default.nil? + out[:variation] = event.variation unless event.variation.nil? + out[:version] = event.version unless event.version.nil? + out[:prereqOf] = event.prereq_of unless event.prereq_of.nil? + out[:contextKeys] = event.context.keys + out[:reason] = event.reason unless event.reason.nil? out when LaunchDarkly::Impl::IdentifyEvent { kind: IDENTIFY_KIND, creationDate: event.timestamp, - key: event.user[:key].to_s, - user: process_user(event.user) + key: event.context.fully_qualified_key, + context: @context_filter.filter(event.context), } - + when LaunchDarkly::Impl::CustomEvent out = { kind: CUSTOM_KIND, creationDate: event.timestamp, - key: event.key + key: event.key, } - out[:data] = event.data if !event.data.nil? - set_user_or_user_key(out, event.user) - out[:metricValue] = event.metric_value if !event.metric_value.nil? - set_opt_context_kind(out, event.user) + out[:data] = event.data unless event.data.nil? + out[:contextKeys] = event.context.keys + out[:metricValue] = event.metric_value unless event.metric_value.nil? out - when LaunchDarkly::Impl::AliasEvent - { - kind: ALIAS_KIND, - creationDate: event.timestamp, - key: event.key, - contextKind: event.context_kind, - previousKey: event.previous_key, - previousContextKind: event.previous_context_kind - } - when LaunchDarkly::Impl::IndexEvent { kind: INDEX_KIND, creationDate: event.timestamp, - user: process_user(event.user) + context: @context_filter.filter(event.context), } - + when LaunchDarkly::Impl::DebugEvent original = event.eval_event out = { kind: DEBUG_KIND, creationDate: original.timestamp, key: original.key, - user: process_user(original.user), - value: original.value + context: @context_filter.filter(original.context), + value: original.value, } - out[:default] = original.default if !original.default.nil? - out[:variation] = original.variation if !original.variation.nil? - out[:version] = original.version if !original.version.nil? - out[:prereqOf] = original.prereq_of if !original.prereq_of.nil? - set_opt_context_kind(out, original.user) - out[:reason] = original.reason if !original.reason.nil? + out[:default] = original.default unless original.default.nil? + out[:variation] = original.variation unless original.variation.nil? + out[:version] = original.version unless original.version.nil? + out[:prereqOf] = original.prereq_of unless original.prereq_of.nil? + out[:reason] = original.reason unless original.reason.nil? out else nil end @@ -567,45 +530,27 @@ counters = [] flagInfo.versions.each do |version, variations| variations.each do |variation, counter| c = { value: counter.value, - count: counter.count + count: counter.count, } - c[:variation] = variation if !variation.nil? + c[:variation] = variation unless variation.nil? if version.nil? c[:unknown] = true else c[:version] = version end counters.push(c) end end - flags[flagKey] = { default: flagInfo.default, counters: counters } + flags[flagKey] = { default: flagInfo.default, counters: counters, contextKinds: flagInfo.context_kinds.to_a } end { kind: SUMMARY_KIND, startDate: summary[:start_date], endDate: summary[:end_date], - features: flags + features: flags, } - end - - private def set_opt_context_kind(out, user) - out[:contextKind] = ANONYMOUS_USER_CONTEXT_KIND if !user.nil? && user[:anonymous] - end - - private def set_user_or_user_key(out, user) - if @inline_users - out[:user] = process_user(user) - else - key = user[:key] - out[:userKey] = key.is_a?(String) ? key : key.to_s - end - end - - private def process_user(user) - filtered = @user_filter.transform_user_props(user) - Util.stringify_attrs(filtered, USER_ATTRS_TO_STRINGIFY_FOR_EVENTS) end end end