Sha256: 8aaaa4ec079776195bc05899b109ad9540aefc5153a2448a969926f218eef6a4
Contents?: true
Size: 1.48 KB
Versions: 1
Compression:
Stored size: 1.48 KB
Contents
module Celluloid class Task # Tasks with a Thread backend class Threaded < Task # Run the given block within a task def initialize(type, meta) @resume_queue = Queue.new @exception_queue = Queue.new @yield_mutex = Mutex.new @yield_cond = ConditionVariable.new @thread = nil super end def create # TODO: move this to ActorSystem#get_thread (ThreadHandle inside Group::Pool) thread = Internals::ThreadHandle.new(Thread.current[:celluloid_actor_system], :task) do begin ex = @resume_queue.pop fail ex if ex.is_a?(Task::TerminatedError) yield rescue Exception => ex @exception_queue << ex ensure @yield_mutex.synchronize do @yield_cond.signal end end end @thread = thread end def signal @yield_mutex.synchronize do @yield_cond.signal end @resume_queue.pop end def deliver(value) fail DeadTaskError, "cannot resume a dead task" unless @thread.alive? @yield_mutex.synchronize do @resume_queue.push(value) @yield_cond.wait(@yield_mutex) fail @exception_queue.pop while @exception_queue.size > 0 end rescue ThreadError raise DeadTaskError, "cannot resume a dead task" end def backtrace @thread.backtrace end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
celluloid-0.17.0 | lib/celluloid/task/threaded.rb |