module LiveQA module Processor class Worker DEFAULT_WAIT_TIME = 2 attr_reader :queue attr_reader :batches attr_reader :executor def initialize(queue, executor) @queue = queue @batches = [] @executor = executor end def run while running? return if queue.empty? && batches.empty? sleep(DEFAULT_WAIT_TIME) if queue.empty? create_new_batch unless queue.empty? send_batches end end private def running? return true if !queue.empty? || !batches.empty? executor.running? end def create_new_batch batch = Batch.new batch << queue.pop until batch.full? || queue.empty? batches.push(batch) end def send_batches batches.dup.each do batch = batches.pop unless batch.can_run? batches.push(batch) next end begin LiveQA::Batch.create(Message.base.merge(data: batch.messages)) rescue LiveQA::RequestError => error return LiveQA.configurations.logger.error(error.message) if [401, 404].include?(error.http_status) batch_retry(batch) rescue Errno::ECONNREFUSED => _error batch_retry(batch) end end end def batch_retry(batch) batch.update_retry batches.push(batch) if batch.can_retry? end end end end