Sha256: 6c8d65d2072fa6737d9a594842747752c45e888018a8e8afc87d8e3dda69e844

Contents?: true

Size: 1.49 KB

Versions: 1

Compression:

Stored size: 1.49 KB

Contents

module LiveQA
  module Processor
    class Worker

      DEFAULT_WAIT_TIME = 2

      attr_reader :queue
      attr_reader :batches
      attr_reader :executor

      def initialize(queue, executor)
        @queue = queue
        @batches = []

        @executor = executor
      end

      def run
        while running?
          return if queue.empty? && batches.empty?

          sleep(DEFAULT_WAIT_TIME) if queue.empty?

          create_new_batch unless queue.empty?

          send_batches
        end
      end

      private

      def running?
        return true if !queue.empty? || !batches.empty?
        executor.running?
      end

      def create_new_batch
        batch = Batch.new
        batch << queue.pop until batch.full? || queue.empty?
        batches.push(batch)
      end

      def send_batches
        batches.dup.each do
          batch = batches.pop

          unless batch.can_run?
            batches.push(batch)
            next
          end

          begin
            LiveQA::Batch.create(Message.base.merge(data: batch.messages))
          rescue LiveQA::RequestError => error
            return LiveQA.configurations.logger.error(error.message) if [401, 404].include?(error.http_status)
            batch_retry(batch)
          rescue Errno::ECONNREFUSED, OpenSSL::SSL::SSLError => _error
            batch_retry(batch)
          end
        end
      end

      def batch_retry(batch)
        batch.update_retry
        batches.push(batch) if batch.can_retry?
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
liveqa-1.9.3 lib/liveqa/processor/worker.rb