lib/ldclient-rb/events.rb in launchdarkly-server-sdk-6.4.0 vs lib/ldclient-rb/events.rb in launchdarkly-server-sdk-7.0.0
- old
+ new
@@ -1,5 +1,6 @@
+require "ldclient-rb/impl/context_filter"
require "ldclient-rb/impl/diagnostic_events"
require "ldclient-rb/impl/event_sender"
require "ldclient-rb/impl/event_summarizer"
require "ldclient-rb/impl/event_types"
require "ldclient-rb/impl/util"
@@ -18,21 +19,21 @@
# the event in a bounded queue, the "inbox", and immediately returns.
#
# On a separate worker thread, EventDispatcher consumes events from the inbox. These are considered
# "input events" because they may or may not actually be sent to LaunchDarkly; most flag evaluation
# events are not sent, but are counted and the counters become part of a single summary event.
-# EventDispatcher updates those counters, creates "index" events for any users that have not been seen
+# EventDispatcher updates those counters, creates "index" events for any contexts that have not been seen
# recently, and places any events that will be sent to LaunchDarkly into the "outbox" queue.
#
# When it is time to flush events to LaunchDarkly, the contents of the outbox are handed off to
# another worker thread which sends the HTTP request.
#
module LaunchDarkly
module EventProcessorMethods
def record_eval_event(
- user,
+ context,
key,
version = nil,
variation = nil,
value = nil,
reason = nil,
@@ -41,37 +42,30 @@
debug_until = nil,
prereq_of = nil
)
end
- def record_identify_event(user)
+ def record_identify_event(context)
end
def record_custom_event(
- user,
+ context,
key,
data = nil,
metric_value = nil
)
end
- def record_alias_event(user, previous_user)
- end
-
def flush
end
def stop
end
end
MAX_FLUSH_WORKERS = 5
- USER_ATTRS_TO_STRINGIFY_FOR_EVENTS = [ :key, :secondary, :ip, :country, :email, :firstName, :lastName,
- :avatar, :name ]
-
private_constant :MAX_FLUSH_WORKERS
- private_constant :USER_ATTRS_TO_STRINGIFY_FOR_EVENTS
# @private
class NullEventProcessor
include EventProcessorMethods
end
@@ -79,11 +73,11 @@
# @private
class FlushMessage
end
# @private
- class FlushUsersMessage
+ class FlushContextsMessage
end
# @private
class DiagnosticEventMessage
end
@@ -91,11 +85,11 @@
# @private
class SynchronousMessage
def initialize
@reply = Concurrent::Semaphore.new(0)
end
-
+
def completed
@reply.release
end
def wait_for_completion
@@ -121,14 +115,14 @@
@inbox = SizedQueue.new(config.capacity < 100 ? 100 : config.capacity)
@flush_task = Concurrent::TimerTask.new(execution_interval: config.flush_interval) do
post_to_inbox(FlushMessage.new)
end
@flush_task.execute
- @users_flush_task = Concurrent::TimerTask.new(execution_interval: config.user_keys_flush_interval) do
- post_to_inbox(FlushUsersMessage.new)
+ @contexts_flush_task = Concurrent::TimerTask.new(execution_interval: config.context_keys_flush_interval) do
+ post_to_inbox(FlushContextsMessage.new)
end
- @users_flush_task.execute
+ @contexts_flush_task.execute
if !diagnostic_accumulator.nil?
interval = test_properties && test_properties.has_key?(:diagnostic_recording_interval) ?
test_properties[:diagnostic_recording_interval] :
config.diagnostic_recording_interval
@diagnostic_event_task = Concurrent::TimerTask.new(execution_interval: interval) do
@@ -140,62 +134,52 @@
end
@stopped = Concurrent::AtomicBoolean.new(false)
@inbox_full = Concurrent::AtomicBoolean.new(false)
event_sender = (test_properties || {})[:event_sender] ||
- Impl::EventSender.new(sdk_key, config, client ? client : Util.new_http_client(config.events_uri, config))
+ Impl::EventSender.new(sdk_key, config, client || Util.new_http_client(config.events_uri, config))
@timestamp_fn = (test_properties || {})[:timestamp_fn] || proc { Impl::Util.current_time_millis }
EventDispatcher.new(@inbox, sdk_key, config, diagnostic_accumulator, event_sender)
end
def record_eval_event(
- user,
+ context,
key,
version = nil,
variation = nil,
value = nil,
reason = nil,
default = nil,
track_events = false,
debug_until = nil,
prereq_of = nil
)
- post_to_inbox(LaunchDarkly::Impl::EvalEvent.new(timestamp, user, key, version, variation, value, reason,
+ post_to_inbox(LaunchDarkly::Impl::EvalEvent.new(timestamp, context, key, version, variation, value, reason,
default, track_events, debug_until, prereq_of))
end
- def record_identify_event(user)
- post_to_inbox(LaunchDarkly::Impl::IdentifyEvent.new(timestamp, user))
+ def record_identify_event(context)
+ post_to_inbox(LaunchDarkly::Impl::IdentifyEvent.new(timestamp, context))
end
- def record_custom_event(user, key, data = nil, metric_value = nil)
- post_to_inbox(LaunchDarkly::Impl::CustomEvent.new(timestamp, user, key, data, metric_value))
+ def record_custom_event(context, key, data = nil, metric_value = nil)
+ post_to_inbox(LaunchDarkly::Impl::CustomEvent.new(timestamp, context, key, data, metric_value))
end
- def record_alias_event(user, previous_user)
- post_to_inbox(LaunchDarkly::Impl::AliasEvent.new(
- timestamp,
- user.nil? ? nil : user[:key],
- user_to_context_kind(user),
- previous_user.nil? ? nil : previous_user[:key],
- user_to_context_kind(previous_user)
- ))
- end
-
def flush
# flush is done asynchronously
post_to_inbox(FlushMessage.new)
end
def stop
# final shutdown, which includes a final flush, is done synchronously
if @stopped.make_true
@flush_task.shutdown
- @users_flush_task.shutdown
- @diagnostic_event_task.shutdown if !@diagnostic_event_task.nil?
+ @contexts_flush_task.shutdown
+ @diagnostic_event_task.shutdown unless @diagnostic_event_task.nil?
# Note that here we are not calling post_to_inbox, because we *do* want to wait if the inbox
# is full; an orderly shutdown can't happen unless these messages are received.
@inbox << FlushMessage.new
stop_msg = StopMessage.new
@inbox << stop_msg
@@ -225,31 +209,27 @@
if @inbox_full.make_true
@logger.warn { "[LDClient] Events are being produced faster than they can be processed; some events will be dropped" }
end
end
end
-
- private def user_to_context_kind(user)
- (user.nil? || !user[:anonymous]) ? 'user' : 'anonymousUser'
- end
end
# @private
class EventDispatcher
def initialize(inbox, sdk_key, config, diagnostic_accumulator, event_sender)
@sdk_key = sdk_key
@config = config
@diagnostic_accumulator = config.diagnostic_opt_out? ? nil : diagnostic_accumulator
@event_sender = event_sender
- @user_keys = SimpleLRUCacheSet.new(config.user_keys_capacity)
+ @context_keys = SimpleLRUCacheSet.new(config.context_keys_capacity)
@formatter = EventOutputFormatter.new(config)
@disabled = Concurrent::AtomicBoolean.new(false)
@last_known_past_time = Concurrent::AtomicReference.new(0)
- @deduplicated_users = 0
+ @deduplicated_contexts = 0
@events_in_last_batch = 0
-
+
outbox = EventBuffer.new(config.capacity, config.logger)
flush_workers = NonBlockingThreadPool.new(MAX_FLUSH_WORKERS)
if !@diagnostic_accumulator.nil?
diagnostic_event_workers = NonBlockingThreadPool.new(1)
@@ -270,12 +250,12 @@
begin
message = inbox.pop
case message
when FlushMessage
trigger_flush(outbox, flush_workers)
- when FlushUsersMessage
- @user_keys.clear
+ when FlushContextsMessage
+ @context_keys.clear
when DiagnosticEventMessage
send_and_reset_diagnostics(outbox, diagnostic_event_workers)
when TestSyncMessage
synchronize_for_testing(flush_workers, diagnostic_event_workers)
message.completed
@@ -293,21 +273,21 @@
end
def do_shutdown(flush_workers, diagnostic_event_workers)
flush_workers.shutdown
flush_workers.wait_for_termination
- if !diagnostic_event_workers.nil?
+ unless diagnostic_event_workers.nil?
diagnostic_event_workers.shutdown
diagnostic_event_workers.wait_for_termination
end
@event_sender.stop if @event_sender.respond_to?(:stop)
end
def synchronize_for_testing(flush_workers, diagnostic_event_workers)
# Used only by unit tests. Wait until all active flush workers have finished.
flush_workers.wait_all
- diagnostic_event_workers.wait_all if !diagnostic_event_workers.nil?
+ diagnostic_event_workers.wait_all unless diagnostic_event_workers.nil?
end
def dispatch_event(event, outbox)
return if @disabled.value
@@ -325,31 +305,30 @@
end
else
will_add_full_event = true
end
- # For each user we haven't seen before, we add an index event - unless this is already
- # an identify event for that user.
- if !(will_add_full_event && @config.inline_users_in_events)
- if !event.user.nil? && !notice_user(event.user) && !event.is_a?(LaunchDarkly::Impl::IdentifyEvent)
- outbox.add_event(LaunchDarkly::Impl::IndexEvent.new(event.timestamp, event.user))
- end
+ # For each context we haven't seen before, we add an index event - unless this is already
+ # an identify event for that context.
+ if !event.context.nil? && !notice_context(event.context) && !event.is_a?(LaunchDarkly::Impl::IdentifyEvent)
+ outbox.add_event(LaunchDarkly::Impl::IndexEvent.new(event.timestamp, event.context))
end
outbox.add_event(event) if will_add_full_event
- outbox.add_event(debug_event) if !debug_event.nil?
+ outbox.add_event(debug_event) unless debug_event.nil?
end
- # Add to the set of users we've noticed, and return true if the user was already known to us.
- def notice_user(user)
- if user.nil? || !user.has_key?(:key)
- true
- else
- known = @user_keys.add(user[:key].to_s)
- @deduplicated_users += 1 if known
- known
- end
+ #
+ # Add to the set of contexts we've noticed, and return true if the context
+ # was already known to us.
+ # @param context [LaunchDarkly::LDContext]
+ # @return [Boolean]
+ #
+ def notice_context(context)
+ known = @context_keys.add(context.fully_qualified_key)
+ @deduplicated_contexts += 1 if known
+ known
end
def should_debug_event(event)
debug_until = event.debug_until
if !debug_until.nil?
@@ -363,21 +342,21 @@
def trigger_flush(outbox, flush_workers)
if @disabled.value
return
end
- payload = outbox.get_payload
+ payload = outbox.get_payload
if !payload.events.empty? || !payload.summary.counters.empty?
count = payload.events.length + (payload.summary.counters.empty? ? 0 : 1)
@events_in_last_batch = count
# If all available worker threads are busy, success will be false and no job will be queued.
success = flush_workers.post do
begin
events_out = @formatter.make_output_events(payload.events, payload.summary)
result = @event_sender.send_event_data(events_out.to_json, "#{events_out.length} events", false)
@disabled.value = true if result.must_shutdown
- if !result.time_from_server.nil?
+ unless result.time_from_server.nil?
@last_known_past_time.value = (result.time_from_server.to_f * 1000).to_i
end
rescue => e
Util.log_exception(@config.logger, "Unexpected error in event processor", e)
end
@@ -389,12 +368,12 @@
end
def send_and_reset_diagnostics(outbox, diagnostic_event_workers)
return if @diagnostic_accumulator.nil?
dropped_count = outbox.get_and_clear_dropped_count
- event = @diagnostic_accumulator.create_periodic_event_and_reset(dropped_count, @deduplicated_users, @events_in_last_batch)
- @deduplicated_users = 0
+ event = @diagnostic_accumulator.create_periodic_event_and_reset(dropped_count, @deduplicated_contexts, @events_in_last_batch)
+ @deduplicated_contexts = 0
@events_in_last_batch = 0
send_diagnostic_event(event, diagnostic_event_workers)
end
def send_diagnostic_event(event, diagnostic_event_workers)
@@ -428,11 +407,11 @@
if @events.length < @capacity
@events.push(event)
@capacity_exceeded = false
else
@dropped_events += 1
- if !@capacity_exceeded
+ unless @capacity_exceeded
@capacity_exceeded = true
@logger.warn { "[LDClient] Exceeded event queue capacity. Increase capacity to avoid dropping events." }
end
end
end
@@ -440,11 +419,11 @@
def add_to_summary(event)
@summarizer.summarize_event(event)
end
def get_payload
- return FlushPayload.new(@events, @summarizer.snapshot)
+ FlushPayload.new(@events, @summarizer.snapshot)
end
def get_and_clear_dropped_count
ret = @dropped_events
@dropped_events = 0
@@ -460,101 +439,85 @@
# @private
class EventOutputFormatter
FEATURE_KIND = 'feature'
IDENTIFY_KIND = 'identify'
CUSTOM_KIND = 'custom'
- ALIAS_KIND = 'alias'
INDEX_KIND = 'index'
DEBUG_KIND = 'debug'
SUMMARY_KIND = 'summary'
- ANONYMOUS_USER_CONTEXT_KIND = 'anonymousUser'
def initialize(config)
- @inline_users = config.inline_users_in_events
- @user_filter = UserFilter.new(config)
+ @context_filter = LaunchDarkly::Impl::ContextFilter.new(config.all_attributes_private, config.private_attributes)
end
# Transforms events into the format used for event sending.
def make_output_events(events, summary)
events_out = events.map { |e| make_output_event(e) }
- if !summary.counters.empty?
+ unless summary.counters.empty?
events_out.push(make_summary_event(summary))
end
events_out
end
private def make_output_event(event)
case event
-
+
when LaunchDarkly::Impl::EvalEvent
out = {
kind: FEATURE_KIND,
creationDate: event.timestamp,
key: event.key,
- value: event.value
+ value: event.value,
}
- out[:default] = event.default if !event.default.nil?
- out[:variation] = event.variation if !event.variation.nil?
- out[:version] = event.version if !event.version.nil?
- out[:prereqOf] = event.prereq_of if !event.prereq_of.nil?
- set_opt_context_kind(out, event.user)
- set_user_or_user_key(out, event.user)
- out[:reason] = event.reason if !event.reason.nil?
+ out[:default] = event.default unless event.default.nil?
+ out[:variation] = event.variation unless event.variation.nil?
+ out[:version] = event.version unless event.version.nil?
+ out[:prereqOf] = event.prereq_of unless event.prereq_of.nil?
+ out[:contextKeys] = event.context.keys
+ out[:reason] = event.reason unless event.reason.nil?
out
when LaunchDarkly::Impl::IdentifyEvent
{
kind: IDENTIFY_KIND,
creationDate: event.timestamp,
- key: event.user[:key].to_s,
- user: process_user(event.user)
+ key: event.context.fully_qualified_key,
+ context: @context_filter.filter(event.context),
}
-
+
when LaunchDarkly::Impl::CustomEvent
out = {
kind: CUSTOM_KIND,
creationDate: event.timestamp,
- key: event.key
+ key: event.key,
}
- out[:data] = event.data if !event.data.nil?
- set_user_or_user_key(out, event.user)
- out[:metricValue] = event.metric_value if !event.metric_value.nil?
- set_opt_context_kind(out, event.user)
+ out[:data] = event.data unless event.data.nil?
+ out[:contextKeys] = event.context.keys
+ out[:metricValue] = event.metric_value unless event.metric_value.nil?
out
- when LaunchDarkly::Impl::AliasEvent
- {
- kind: ALIAS_KIND,
- creationDate: event.timestamp,
- key: event.key,
- contextKind: event.context_kind,
- previousKey: event.previous_key,
- previousContextKind: event.previous_context_kind
- }
-
when LaunchDarkly::Impl::IndexEvent
{
kind: INDEX_KIND,
creationDate: event.timestamp,
- user: process_user(event.user)
+ context: @context_filter.filter(event.context),
}
-
+
when LaunchDarkly::Impl::DebugEvent
original = event.eval_event
out = {
kind: DEBUG_KIND,
creationDate: original.timestamp,
key: original.key,
- user: process_user(original.user),
- value: original.value
+ context: @context_filter.filter(original.context),
+ value: original.value,
}
- out[:default] = original.default if !original.default.nil?
- out[:variation] = original.variation if !original.variation.nil?
- out[:version] = original.version if !original.version.nil?
- out[:prereqOf] = original.prereq_of if !original.prereq_of.nil?
- set_opt_context_kind(out, original.user)
- out[:reason] = original.reason if !original.reason.nil?
+ out[:default] = original.default unless original.default.nil?
+ out[:variation] = original.variation unless original.variation.nil?
+ out[:version] = original.version unless original.version.nil?
+ out[:prereqOf] = original.prereq_of unless original.prereq_of.nil?
+ out[:reason] = original.reason unless original.reason.nil?
out
else
nil
end
@@ -567,45 +530,27 @@
counters = []
flagInfo.versions.each do |version, variations|
variations.each do |variation, counter|
c = {
value: counter.value,
- count: counter.count
+ count: counter.count,
}
- c[:variation] = variation if !variation.nil?
+ c[:variation] = variation unless variation.nil?
if version.nil?
c[:unknown] = true
else
c[:version] = version
end
counters.push(c)
end
end
- flags[flagKey] = { default: flagInfo.default, counters: counters }
+ flags[flagKey] = { default: flagInfo.default, counters: counters, contextKinds: flagInfo.context_kinds.to_a }
end
{
kind: SUMMARY_KIND,
startDate: summary[:start_date],
endDate: summary[:end_date],
- features: flags
+ features: flags,
}
- end
-
- private def set_opt_context_kind(out, user)
- out[:contextKind] = ANONYMOUS_USER_CONTEXT_KIND if !user.nil? && user[:anonymous]
- end
-
- private def set_user_or_user_key(out, user)
- if @inline_users
- out[:user] = process_user(user)
- else
- key = user[:key]
- out[:userKey] = key.is_a?(String) ? key : key.to_s
- end
- end
-
- private def process_user(user)
- filtered = @user_filter.transform_user_props(user)
- Util.stringify_attrs(filtered, USER_ATTRS_TO_STRINGIFY_FOR_EVENTS)
end
end
end