lib/ldclient-rb/events.rb in launchdarkly-server-sdk-5.5.11 vs lib/ldclient-rb/events.rb in launchdarkly-server-sdk-5.5.12
- old
+ new
@@ -2,10 +2,27 @@
require "concurrent/atomics"
require "concurrent/executors"
require "thread"
require "time"
+#
+# Analytics event processing in the SDK involves several components. The purpose of this design is to
+# minimize overhead on the application threads that are generating analytics events.
+#
+# EventProcessor receives an analytics event from the SDK client, on an application thread. It places
+# the event in a bounded queue, the "inbox", and immediately returns.
+#
+# On a separate worker thread, EventDispatcher consumes events from the inbox. These are considered
+# "input events" because they may or may not actually be sent to LaunchDarkly; most flag evaluation
+# events are not sent, but are counted and the counters become part of a single summary event.
+# EventDispatcher updates those counters, creates "index" events for any users that have not been seen
+# recently, and places any events that will be sent to LaunchDarkly into the "outbox" queue.
+#
+# When it is time to flush events to LaunchDarkly, the contents of the outbox are handed off to
+# another worker thread which sends the HTTP request.
+#
+
module LaunchDarkly
MAX_FLUSH_WORKERS = 5
CURRENT_SCHEMA_VERSION = 3
USER_ATTRS_TO_STRINGIFY_FOR_EVENTS = [ :key, :secondary, :ip, :country, :email, :firstName, :lastName,
:avatar, :name ]
@@ -66,57 +83,77 @@
end
# @private
class EventProcessor
def initialize(sdk_key, config, client = nil)
- @queue = Queue.new
+ @logger = config.logger
+ @inbox = SizedQueue.new(config.capacity)
@flush_task = Concurrent::TimerTask.new(execution_interval: config.flush_interval) do
- @queue << FlushMessage.new
+ post_to_inbox(FlushMessage.new)
end
@flush_task.execute
@users_flush_task = Concurrent::TimerTask.new(execution_interval: config.user_keys_flush_interval) do
- @queue << FlushUsersMessage.new
+ post_to_inbox(FlushUsersMessage.new)
end
@users_flush_task.execute
@stopped = Concurrent::AtomicBoolean.new(false)
-
- EventDispatcher.new(@queue, sdk_key, config, client)
+ @inbox_full = Concurrent::AtomicBoolean.new(false)
+
+ EventDispatcher.new(@inbox, sdk_key, config, client)
end
def add_event(event)
event[:creationDate] = (Time.now.to_f * 1000).to_i
- @queue << EventMessage.new(event)
+ post_to_inbox(EventMessage.new(event))
end
def flush
# flush is done asynchronously
- @queue << FlushMessage.new
+ post_to_inbox(FlushMessage.new)
end
def stop
# final shutdown, which includes a final flush, is done synchronously
if @stopped.make_true
@flush_task.shutdown
@users_flush_task.shutdown
- @queue << FlushMessage.new
+ # Note that here we are not calling post_to_inbox, because we *do* want to wait if the inbox
+ # is full; an orderly shutdown can't happen unless these messages are received.
+ @inbox << FlushMessage.new
stop_msg = StopMessage.new
- @queue << stop_msg
+ @inbox << stop_msg
stop_msg.wait_for_completion
end
end
# exposed only for testing
def wait_until_inactive
sync_msg = TestSyncMessage.new
- @queue << sync_msg
+ @inbox << sync_msg
sync_msg.wait_for_completion
end
+
+ private
+
+ def post_to_inbox(message)
+ begin
+ @inbox.push(message, non_block=true)
+ rescue ThreadError
+ # If the inbox is full, it means the EventDispatcher thread is seriously backed up with not-yet-processed
+ # events. This is unlikely, but if it happens, it means the application is probably doing a ton of flag
+ # evaluations across many threads-- so if we wait for a space in the inbox, we risk a very serious slowdown
+ # of the app. To avoid that, we'll just drop the event. The log warning about this will only be shown once.
+ if @inbox_full.make_true
+ @logger.warn { "[LDClient] Events are being produced faster than they can be processed; some events will be dropped" }
+ end
+ end
+ end
end
# @private
class EventDispatcher
- def initialize(queue, sdk_key, config, client)
+ def initialize(inbox, sdk_key, config, client)
@sdk_key = sdk_key
@config = config
if client
@client = client
@@ -127,32 +164,32 @@
@user_keys = SimpleLRUCacheSet.new(config.user_keys_capacity)
@formatter = EventOutputFormatter.new(config)
@disabled = Concurrent::AtomicBoolean.new(false)
@last_known_past_time = Concurrent::AtomicReference.new(0)
- buffer = EventBuffer.new(config.capacity, config.logger)
+ outbox = EventBuffer.new(config.capacity, config.logger)
flush_workers = NonBlockingThreadPool.new(MAX_FLUSH_WORKERS)
- Thread.new { main_loop(queue, buffer, flush_workers) }
+ Thread.new { main_loop(inbox, outbox, flush_workers) }
end
private
def now_millis()
(Time.now.to_f * 1000).to_i
end
- def main_loop(queue, buffer, flush_workers)
+ def main_loop(inbox, outbox, flush_workers)
running = true
while running do
begin
- message = queue.pop
+ message = inbox.pop
case message
when EventMessage
- dispatch_event(message.event, buffer)
+ dispatch_event(message.event, outbox)
when FlushMessage
- trigger_flush(buffer, flush_workers)
+ trigger_flush(outbox, flush_workers)
when FlushUsersMessage
@user_keys.clear
when TestSyncMessage
synchronize_for_testing(flush_workers)
message.completed
@@ -179,15 +216,15 @@
def synchronize_for_testing(flush_workers)
# Used only by unit tests. Wait until all active flush workers have finished.
flush_workers.wait_all
end
- def dispatch_event(event, buffer)
+ def dispatch_event(event, outbox)
return if @disabled.value
# Always record the event in the summary.
- buffer.add_to_summary(event)
+ outbox.add_to_summary(event)
# Decide whether to add the event to the payload. Feature events may be added twice, once for
# the event (if tracked) and once for debugging.
will_add_full_event = false
debug_event = nil
@@ -203,20 +240,20 @@
# For each user we haven't seen before, we add an index event - unless this is already
# an identify event for that user.
if !(will_add_full_event && @config.inline_users_in_events)
if event.has_key?(:user) && !notice_user(event[:user]) && event[:kind] != "identify"
- buffer.add_event({
+ outbox.add_event({
kind: "index",
creationDate: event[:creationDate],
user: event[:user]
})
end
end
- buffer.add_event(event) if will_add_full_event
- buffer.add_event(debug_event) if !debug_event.nil?
+ outbox.add_event(event) if will_add_full_event
+ outbox.add_event(debug_event) if !debug_event.nil?
end
# Add to the set of users we've noticed, and return true if the user was already known to us.
def notice_user(user)
if user.nil? || !user.has_key?(:key)
@@ -234,26 +271,26 @@
else
false
end
end
- def trigger_flush(buffer, flush_workers)
+ def trigger_flush(outbox, flush_workers)
if @disabled.value
return
end
- payload = buffer.get_payload
+ payload = outbox.get_payload
if !payload.events.empty? || !payload.summary.counters.empty?
# If all available worker threads are busy, success will be false and no job will be queued.
success = flush_workers.post do
begin
resp = EventPayloadSendTask.new.run(@sdk_key, @config, @client, payload, @formatter)
handle_response(resp) if !resp.nil?
rescue => e
Util.log_exception(@config.logger, "Unexpected error in event processor", e)
end
end
- buffer.clear if success # Reset our internal state, these events now belong to the flush worker
+ outbox.clear if success # Reset our internal state, these events now belong to the flush worker
end
end
def handle_response(res)
status = res.code.to_i