Sha256: 0214923f6d4d72a0917c0435fa2ef314852ccc9c2883a96ded72e1ccff061561
Contents?: true
Size: 1.4 KB
Versions: 1
Compression:
Stored size: 1.4 KB
Contents
module LiveQA module Processor class Worker DEFAULT_WAIT_TIME = 2 attr_reader :queue attr_reader :lock attr_reader :batches def initialize(queue) @queue = queue @lock = Mutex.new @batches = [] end def run until Thread.current[:should_exit] return if queue.empty? && batches.empty? sleep(DEFAULT_WAIT_TIME) if queue.empty? create_new_batch unless queue.empty? send_batches end end private def create_new_batch batch = Batch.new lock.synchronize do batch << queue.pop until batch.full? || queue.empty? end batches.push(batch) end def send_batches batches.dup.each do batch = batches.pop unless batch.can_run? batches.push(batch) next end begin LiveQA::Batch.create(Message.base.merge(data: batch.messages)) rescue LiveQA::RequestError => error return LiveQA.configurations.logger.error(error.message) if [401, 404].include?(error.http_status) batch_retry(batch) rescue Errno::ECONNREFUSED => _error batch_retry(batch) end end end def batch_retry(batch) batch.update_retry batches.push(batch) if batch.can_retry? end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
liveqa-1.9.0 | lib/liveqa/processor/worker.rb |