module LiveQA module Processor class Worker FLUSH_INTERVAL_SECONDS = 3 FLUSH_MESSAGE = Object.new SHUTDOWN_MESSAGE = Object.new def initialize(queue, state) @queue = queue @state = state @batch = Batch.new @promises = Concurrent::Array.new @timer = Concurrent::TimerTask.new(execution_interval: FLUSH_INTERVAL_SECONDS) { @queue << FLUSH_MESSAGE } @timer.execute end def run while thread_active? message = @queue.pop add_message_to_batch(message) end shutdown_worker end private ## # Add the message to a batch # # - Add message to a batch if it's not a flush message # - flush batch if flush message is receive or batch is full def add_message_to_batch(message) @batch << message if message != FLUSH_MESSAGE flush if message == FLUSH_MESSAGE || @batch.full? end ## # Shutdown the worker # # - Close the timer # - Retreive the last messages # - Wait all the request to be completed # # rubocop:disable Lint/HandleExceptions def shutdown_worker @timer.shutdown begin until (message = @queue.pop(true)).nil? add_message_to_batch(message) end rescue ThreadError => _ end flush @promises.each do |promise| promise.wait if promise && !promise.fulfilled? end end # rubocop:enable Lint/HandleExceptions def flush return if @batch.empty? send_batch(@batch) @batch = Batch.new end # rubocop:disable Metrics/AbcSize def send_batch(batch) promise = Concurrent::Promise .new { LiveQA::Batch.create(Message.base.merge(data: batch.messages)) } .on_success { @promises.delete(promise) } .rescue do |error| batch.update_retry if error.is_a?(LiveQA::RequestError) && [401, 404].include?(error.http_status) return logger.error(error.message) end if thread_active? && batch.can_retry? Concurrent::ScheduledTask.new(batch.next_retry) { send_batch(batch) }.execute else @promises.delete(promise) end end promise.execute @promises << promise rescue Concurrent::RejectedExecutionError => _ logger.error('Impossible to start a thread in closing application') end # rubocop:enable Metrics/AbcSize def thread_active? @state.true? end def logger LiveQA.configurations.logger end end end end