lib/liveqa/processor/worker.rb in liveqa-1.9.3 vs lib/liveqa/processor/worker.rb in liveqa-1.9.4
- old
+ new
@@ -1,69 +1,112 @@
module LiveQA
module Processor
class Worker
- DEFAULT_WAIT_TIME = 2
+ FLUSH_INTERVAL_SECONDS = 3
- attr_reader :queue
- attr_reader :batches
- attr_reader :executor
+ FLUSH_MESSAGE = Object.new
+ SHUTDOWN_MESSAGE = Object.new
- def initialize(queue, executor)
+ def initialize(queue, state)
@queue = queue
- @batches = []
+ @state = state
- @executor = executor
+ @batch = Batch.new
+ @promises = Concurrent::Array.new
+
+ @timer = Concurrent::TimerTask.new(execution_interval: FLUSH_INTERVAL_SECONDS) { @queue << FLUSH_MESSAGE }
+ @timer.execute
end
def run
- while running?
- return if queue.empty? && batches.empty?
+ while thread_active?
+ message = @queue.pop
- sleep(DEFAULT_WAIT_TIME) if queue.empty?
-
- create_new_batch unless queue.empty?
-
- send_batches
+ add_message_to_batch(message)
end
+
+ shutdown_worker
end
private
- def running?
- return true if !queue.empty? || !batches.empty?
- executor.running?
+ ##
+ # Add the message to a batch
+ #
+ # - Add message to a batch if it's not a flush message
+ # - flush batch if flush message is receive or batch is full
+ def add_message_to_batch(message)
+ @batch << message if message != FLUSH_MESSAGE
+ flush if message == FLUSH_MESSAGE || @batch.full?
end
- def create_new_batch
- batch = Batch.new
- batch << queue.pop until batch.full? || queue.empty?
- batches.push(batch)
+ ##
+ # Shutdown the worker
+ #
+ # - Close the timer
+ # - Retreive the last messages
+ # - Wait all the request to be completed
+ #
+ # rubocop:disable Lint/HandleExceptions
+ def shutdown_worker
+ @timer.shutdown
+
+ begin
+ until (message = @queue.pop(true)).nil?
+ add_message_to_batch(message)
+ end
+ rescue ThreadError => _
+ end
+
+ flush
+
+ @promises.each do |promise|
+ promise.wait if promise && !promise.fulfilled?
+ end
end
+ # rubocop:enable Lint/HandleExceptions
- def send_batches
- batches.dup.each do
- batch = batches.pop
+ def flush
+ return if @batch.empty?
- unless batch.can_run?
- batches.push(batch)
- next
- end
+ send_batch(@batch)
+ @batch = Batch.new
+ end
- begin
- LiveQA::Batch.create(Message.base.merge(data: batch.messages))
- rescue LiveQA::RequestError => error
- return LiveQA.configurations.logger.error(error.message) if [401, 404].include?(error.http_status)
- batch_retry(batch)
- rescue Errno::ECONNREFUSED, OpenSSL::SSL::SSLError => _error
- batch_retry(batch)
+ # rubocop:disable Metrics/AbcSize
+ def send_batch(batch)
+ promise =
+ Concurrent::Promise
+ .new { LiveQA::Batch.create(Message.base.merge(data: batch.messages)) }
+ .on_success { @promises.delete(promise) }
+ .rescue do |error|
+ batch.update_retry
+
+ if error.is_a?(LiveQA::RequestError) && [401, 404].include?(error.http_status)
+ return logger.error(error.message)
+ end
+
+ if thread_active? && batch.can_retry?
+ Concurrent::ScheduledTask.new(batch.next_retry) { send_batch(batch) }.execute
+ else
+ @promises.delete(promise)
+ end
end
- end
+
+ promise.execute
+ @promises << promise
+ rescue Concurrent::RejectedExecutionError => _
+ logger.error('Impossible to start a thread in closing application')
end
+ # rubocop:enable Metrics/AbcSize
- def batch_retry(batch)
- batch.update_retry
- batches.push(batch) if batch.can_retry?
+ def thread_active?
+ @state.true?
+ end
+
+ def logger
+ LiveQA.configurations.logger
end
end
end
end