Sha256: 321d0d17b5ee0d90bcdff114f0de04590b557d2a114231157d58b2d64e6de480

Contents?: true

Size: 961 Bytes

Versions: 1

Compression:

Stored size: 961 Bytes

Contents

module LiveQA
  module Processor
    class Async

      def initialize
        @max_queue_size = 10_000
        @queue = Queue.new
        @worker = Worker.new(@queue)
        @worker_mutex = Mutex.new

        at_exit do
          @worker_thread && @worker_thread[:should_exit] = true
        end
      end

      def enqueue(attributes)
        return false if @queue.length > @max_queue_size

        @queue << attributes
        ensure_worker_running

        true
      end

      def flush
        while !@queue.empty? || @worker.is_requesting?
          ensure_worker_running
          sleep(0.1)
        end
      end

      private

      def ensure_worker_running
        return if worker_running?
        @worker_mutex.synchronize do
          return if worker_running?
          @worker_thread = Thread.new { @worker.run }
        end
      end

      def worker_running?
        @worker_thread && @worker_thread.alive?
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
liveqa-1.9.0 lib/liveqa/processor/async.rb