Sha256: 6c8d65d2072fa6737d9a594842747752c45e888018a8e8afc87d8e3dda69e844
Contents?: true
Size: 1.49 KB
Versions: 1
Compression:
Stored size: 1.49 KB
Contents
module LiveQA module Processor class Worker DEFAULT_WAIT_TIME = 2 attr_reader :queue attr_reader :batches attr_reader :executor def initialize(queue, executor) @queue = queue @batches = [] @executor = executor end def run while running? return if queue.empty? && batches.empty? sleep(DEFAULT_WAIT_TIME) if queue.empty? create_new_batch unless queue.empty? send_batches end end private def running? return true if !queue.empty? || !batches.empty? executor.running? end def create_new_batch batch = Batch.new batch << queue.pop until batch.full? || queue.empty? batches.push(batch) end def send_batches batches.dup.each do batch = batches.pop unless batch.can_run? batches.push(batch) next end begin LiveQA::Batch.create(Message.base.merge(data: batch.messages)) rescue LiveQA::RequestError => error return LiveQA.configurations.logger.error(error.message) if [401, 404].include?(error.http_status) batch_retry(batch) rescue Errno::ECONNREFUSED, OpenSSL::SSL::SSLError => _error batch_retry(batch) end end end def batch_retry(batch) batch.update_retry batches.push(batch) if batch.can_retry? end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
liveqa-1.9.3 | lib/liveqa/processor/worker.rb |