lib/ldclient-rb/events.rb in launchdarkly-server-sdk-5.5.11 vs lib/ldclient-rb/events.rb in launchdarkly-server-sdk-5.5.12

- old
+ new

@@ -2,10 +2,27 @@ require "concurrent/atomics" require "concurrent/executors" require "thread" require "time" +# +# Analytics event processing in the SDK involves several components. The purpose of this design is to +# minimize overhead on the application threads that are generating analytics events. +# +# EventProcessor receives an analytics event from the SDK client, on an application thread. It places +# the event in a bounded queue, the "inbox", and immediately returns. +# +# On a separate worker thread, EventDispatcher consumes events from the inbox. These are considered +# "input events" because they may or may not actually be sent to LaunchDarkly; most flag evaluation +# events are not sent, but are counted and the counters become part of a single summary event. +# EventDispatcher updates those counters, creates "index" events for any users that have not been seen +# recently, and places any events that will be sent to LaunchDarkly into the "outbox" queue. +# +# When it is time to flush events to LaunchDarkly, the contents of the outbox are handed off to +# another worker thread which sends the HTTP request. +# + module LaunchDarkly MAX_FLUSH_WORKERS = 5 CURRENT_SCHEMA_VERSION = 3 USER_ATTRS_TO_STRINGIFY_FOR_EVENTS = [ :key, :secondary, :ip, :country, :email, :firstName, :lastName, :avatar, :name ] @@ -66,57 +83,77 @@ end # @private class EventProcessor def initialize(sdk_key, config, client = nil) - @queue = Queue.new + @logger = config.logger + @inbox = SizedQueue.new(config.capacity) @flush_task = Concurrent::TimerTask.new(execution_interval: config.flush_interval) do - @queue << FlushMessage.new + post_to_inbox(FlushMessage.new) end @flush_task.execute @users_flush_task = Concurrent::TimerTask.new(execution_interval: config.user_keys_flush_interval) do - @queue << FlushUsersMessage.new + post_to_inbox(FlushUsersMessage.new) end @users_flush_task.execute @stopped = Concurrent::AtomicBoolean.new(false) - - EventDispatcher.new(@queue, sdk_key, config, client) + @inbox_full = Concurrent::AtomicBoolean.new(false) + + EventDispatcher.new(@inbox, sdk_key, config, client) end def add_event(event) event[:creationDate] = (Time.now.to_f * 1000).to_i - @queue << EventMessage.new(event) + post_to_inbox(EventMessage.new(event)) end def flush # flush is done asynchronously - @queue << FlushMessage.new + post_to_inbox(FlushMessage.new) end def stop # final shutdown, which includes a final flush, is done synchronously if @stopped.make_true @flush_task.shutdown @users_flush_task.shutdown - @queue << FlushMessage.new + # Note that here we are not calling post_to_inbox, because we *do* want to wait if the inbox + # is full; an orderly shutdown can't happen unless these messages are received. + @inbox << FlushMessage.new stop_msg = StopMessage.new - @queue << stop_msg + @inbox << stop_msg stop_msg.wait_for_completion end end # exposed only for testing def wait_until_inactive sync_msg = TestSyncMessage.new - @queue << sync_msg + @inbox << sync_msg sync_msg.wait_for_completion end + + private + + def post_to_inbox(message) + begin + @inbox.push(message, non_block=true) + rescue ThreadError + # If the inbox is full, it means the EventDispatcher thread is seriously backed up with not-yet-processed + # events. This is unlikely, but if it happens, it means the application is probably doing a ton of flag + # evaluations across many threads-- so if we wait for a space in the inbox, we risk a very serious slowdown + # of the app. To avoid that, we'll just drop the event. The log warning about this will only be shown once. + if @inbox_full.make_true + @logger.warn { "[LDClient] Events are being produced faster than they can be processed; some events will be dropped" } + end + end + end end # @private class EventDispatcher - def initialize(queue, sdk_key, config, client) + def initialize(inbox, sdk_key, config, client) @sdk_key = sdk_key @config = config if client @client = client @@ -127,32 +164,32 @@ @user_keys = SimpleLRUCacheSet.new(config.user_keys_capacity) @formatter = EventOutputFormatter.new(config) @disabled = Concurrent::AtomicBoolean.new(false) @last_known_past_time = Concurrent::AtomicReference.new(0) - buffer = EventBuffer.new(config.capacity, config.logger) + outbox = EventBuffer.new(config.capacity, config.logger) flush_workers = NonBlockingThreadPool.new(MAX_FLUSH_WORKERS) - Thread.new { main_loop(queue, buffer, flush_workers) } + Thread.new { main_loop(inbox, outbox, flush_workers) } end private def now_millis() (Time.now.to_f * 1000).to_i end - def main_loop(queue, buffer, flush_workers) + def main_loop(inbox, outbox, flush_workers) running = true while running do begin - message = queue.pop + message = inbox.pop case message when EventMessage - dispatch_event(message.event, buffer) + dispatch_event(message.event, outbox) when FlushMessage - trigger_flush(buffer, flush_workers) + trigger_flush(outbox, flush_workers) when FlushUsersMessage @user_keys.clear when TestSyncMessage synchronize_for_testing(flush_workers) message.completed @@ -179,15 +216,15 @@ def synchronize_for_testing(flush_workers) # Used only by unit tests. Wait until all active flush workers have finished. flush_workers.wait_all end - def dispatch_event(event, buffer) + def dispatch_event(event, outbox) return if @disabled.value # Always record the event in the summary. - buffer.add_to_summary(event) + outbox.add_to_summary(event) # Decide whether to add the event to the payload. Feature events may be added twice, once for # the event (if tracked) and once for debugging. will_add_full_event = false debug_event = nil @@ -203,20 +240,20 @@ # For each user we haven't seen before, we add an index event - unless this is already # an identify event for that user. if !(will_add_full_event && @config.inline_users_in_events) if event.has_key?(:user) && !notice_user(event[:user]) && event[:kind] != "identify" - buffer.add_event({ + outbox.add_event({ kind: "index", creationDate: event[:creationDate], user: event[:user] }) end end - buffer.add_event(event) if will_add_full_event - buffer.add_event(debug_event) if !debug_event.nil? + outbox.add_event(event) if will_add_full_event + outbox.add_event(debug_event) if !debug_event.nil? end # Add to the set of users we've noticed, and return true if the user was already known to us. def notice_user(user) if user.nil? || !user.has_key?(:key) @@ -234,26 +271,26 @@ else false end end - def trigger_flush(buffer, flush_workers) + def trigger_flush(outbox, flush_workers) if @disabled.value return end - payload = buffer.get_payload + payload = outbox.get_payload if !payload.events.empty? || !payload.summary.counters.empty? # If all available worker threads are busy, success will be false and no job will be queued. success = flush_workers.post do begin resp = EventPayloadSendTask.new.run(@sdk_key, @config, @client, payload, @formatter) handle_response(resp) if !resp.nil? rescue => e Util.log_exception(@config.logger, "Unexpected error in event processor", e) end end - buffer.clear if success # Reset our internal state, these events now belong to the flush worker + outbox.clear if success # Reset our internal state, these events now belong to the flush worker end end def handle_response(res) status = res.code.to_i