lib/liveqa/processor/worker.rb in liveqa-1.9.3 vs lib/liveqa/processor/worker.rb in liveqa-1.9.4

- old
+ new

@@ -1,69 +1,112 @@ module LiveQA module Processor class Worker - DEFAULT_WAIT_TIME = 2 + FLUSH_INTERVAL_SECONDS = 3 - attr_reader :queue - attr_reader :batches - attr_reader :executor + FLUSH_MESSAGE = Object.new + SHUTDOWN_MESSAGE = Object.new - def initialize(queue, executor) + def initialize(queue, state) @queue = queue - @batches = [] + @state = state - @executor = executor + @batch = Batch.new + @promises = Concurrent::Array.new + + @timer = Concurrent::TimerTask.new(execution_interval: FLUSH_INTERVAL_SECONDS) { @queue << FLUSH_MESSAGE } + @timer.execute end def run - while running? - return if queue.empty? && batches.empty? + while thread_active? + message = @queue.pop - sleep(DEFAULT_WAIT_TIME) if queue.empty? - - create_new_batch unless queue.empty? - - send_batches + add_message_to_batch(message) end + + shutdown_worker end private - def running? - return true if !queue.empty? || !batches.empty? - executor.running? + ## + # Add the message to a batch + # + # - Add message to a batch if it's not a flush message + # - flush batch if flush message is receive or batch is full + def add_message_to_batch(message) + @batch << message if message != FLUSH_MESSAGE + flush if message == FLUSH_MESSAGE || @batch.full? end - def create_new_batch - batch = Batch.new - batch << queue.pop until batch.full? || queue.empty? - batches.push(batch) + ## + # Shutdown the worker + # + # - Close the timer + # - Retreive the last messages + # - Wait all the request to be completed + # + # rubocop:disable Lint/HandleExceptions + def shutdown_worker + @timer.shutdown + + begin + until (message = @queue.pop(true)).nil? + add_message_to_batch(message) + end + rescue ThreadError => _ + end + + flush + + @promises.each do |promise| + promise.wait if promise && !promise.fulfilled? + end end + # rubocop:enable Lint/HandleExceptions - def send_batches - batches.dup.each do - batch = batches.pop + def flush + return if @batch.empty? - unless batch.can_run? - batches.push(batch) - next - end + send_batch(@batch) + @batch = Batch.new + end - begin - LiveQA::Batch.create(Message.base.merge(data: batch.messages)) - rescue LiveQA::RequestError => error - return LiveQA.configurations.logger.error(error.message) if [401, 404].include?(error.http_status) - batch_retry(batch) - rescue Errno::ECONNREFUSED, OpenSSL::SSL::SSLError => _error - batch_retry(batch) + # rubocop:disable Metrics/AbcSize + def send_batch(batch) + promise = + Concurrent::Promise + .new { LiveQA::Batch.create(Message.base.merge(data: batch.messages)) } + .on_success { @promises.delete(promise) } + .rescue do |error| + batch.update_retry + + if error.is_a?(LiveQA::RequestError) && [401, 404].include?(error.http_status) + return logger.error(error.message) + end + + if thread_active? && batch.can_retry? + Concurrent::ScheduledTask.new(batch.next_retry) { send_batch(batch) }.execute + else + @promises.delete(promise) + end end - end + + promise.execute + @promises << promise + rescue Concurrent::RejectedExecutionError => _ + logger.error('Impossible to start a thread in closing application') end + # rubocop:enable Metrics/AbcSize - def batch_retry(batch) - batch.update_retry - batches.push(batch) if batch.can_retry? + def thread_active? + @state.true? + end + + def logger + LiveQA.configurations.logger end end end end