Sha256: 20c85587f61b3f44f8f9a26cd24b4a5feeb09fe51a453bbabe80b6631c1818ed
Contents?: true
Size: 988 Bytes
Versions: 3
Compression:
Stored size: 988 Bytes
Contents
module LiveQA module Processor class Async def initialize @queue = Queue.new @state_worker = Concurrent::AtomicBoolean.new(true) @worker = Worker.new(@queue, @state_worker) at_exit do shutdown_worker if worker_running? end end def enqueue(attributes) ensure_worker_running @queue << attributes true end private def shutdown_worker @state_worker.make_false @queue << LiveQA::Processor::Worker::SHUTDOWN_MESSAGE @worker_thread.wait executor.shutdown executor.wait_for_termination end def executor @executor ||= Concurrent.global_io_executor end def ensure_worker_running return if worker_running? @worker_thread = Concurrent::Future.execute { @worker.run } end def worker_running? @worker_thread && @worker_thread.incomplete? end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
liveqa-1.9.6 | lib/liveqa/processor/async.rb |
liveqa-1.9.5 | lib/liveqa/processor/async.rb |
liveqa-1.9.4 | lib/liveqa/processor/async.rb |