Sha256: 20c85587f61b3f44f8f9a26cd24b4a5feeb09fe51a453bbabe80b6631c1818ed

Contents?: true

Size: 988 Bytes

Versions: 3

Compression:

Stored size: 988 Bytes

Contents

module LiveQA
  module Processor
    class Async

      def initialize
        @queue = Queue.new
        @state_worker = Concurrent::AtomicBoolean.new(true)
        @worker = Worker.new(@queue, @state_worker)

        at_exit do
          shutdown_worker if worker_running?
        end
      end

      def enqueue(attributes)
        ensure_worker_running

        @queue << attributes

        true
      end

      private

      def shutdown_worker
        @state_worker.make_false
        @queue << LiveQA::Processor::Worker::SHUTDOWN_MESSAGE
        @worker_thread.wait

        executor.shutdown
        executor.wait_for_termination
      end

      def executor
        @executor ||= Concurrent.global_io_executor
      end

      def ensure_worker_running
        return if worker_running?
        @worker_thread = Concurrent::Future.execute { @worker.run }
      end

      def worker_running?
        @worker_thread && @worker_thread.incomplete?
      end

    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
liveqa-1.9.6 lib/liveqa/processor/async.rb
liveqa-1.9.5 lib/liveqa/processor/async.rb
liveqa-1.9.4 lib/liveqa/processor/async.rb