Sha256: 0214923f6d4d72a0917c0435fa2ef314852ccc9c2883a96ded72e1ccff061561

Contents?: true

Size: 1.4 KB

Versions: 1

Compression:

Stored size: 1.4 KB

Contents

module LiveQA
  module Processor
    class Worker

      DEFAULT_WAIT_TIME = 2

      attr_reader :queue
      attr_reader :lock
      attr_reader :batches

      def initialize(queue)
        @queue = queue
        @lock = Mutex.new
        @batches = []
      end

      def run
        until Thread.current[:should_exit]
          return if queue.empty? && batches.empty?

          sleep(DEFAULT_WAIT_TIME) if queue.empty?

          create_new_batch unless queue.empty?

          send_batches
        end
      end

      private

      def create_new_batch
        batch = Batch.new
        lock.synchronize do
          batch << queue.pop until batch.full? || queue.empty?
        end
        batches.push(batch)
      end

      def send_batches
        batches.dup.each do
          batch = batches.pop

          unless batch.can_run?
            batches.push(batch)
            next
          end

          begin
            LiveQA::Batch.create(Message.base.merge(data: batch.messages))
          rescue LiveQA::RequestError => error
            return LiveQA.configurations.logger.error(error.message) if [401, 404].include?(error.http_status)
            batch_retry(batch)
          rescue Errno::ECONNREFUSED => _error
            batch_retry(batch)
          end
        end
      end

      def batch_retry(batch)
        batch.update_retry
        batches.push(batch) if batch.can_retry?
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
liveqa-1.9.0 lib/liveqa/processor/worker.rb