Sha256: edf58209bf0350d21ff390e2a19cabd88feac137a3a732e9a0860e9b2f00bb64
Contents?: true
Size: 1.25 KB
Versions: 9
Compression:
Stored size: 1.25 KB
Contents
module Celluloid # Tasks with a Thread backend class TaskThread < Task # Run the given block within a task def initialize(type, meta) @resume_queue = Queue.new @exception_queue = Queue.new @yield_mutex = Mutex.new @yield_cond = ConditionVariable.new super end def create # TODO: move this to ActorSystem#get_thread (ThreadHandle inside InternalPool) @thread = ThreadHandle.new(Thread.current[:celluloid_actor_system], :task) do begin ex = @resume_queue.pop raise ex if ex.is_a?(Task::TerminatedError) yield rescue Exception => ex @exception_queue << ex ensure @yield_cond.signal end end end def signal @yield_cond.signal @resume_queue.pop end def deliver(value) raise DeadTaskError, "cannot resume a dead task" unless @thread.alive? @yield_mutex.synchronize do @resume_queue.push(value) @yield_cond.wait(@yield_mutex) while @exception_queue.size > 0 raise @exception_queue.pop end end rescue ThreadError raise DeadTaskError, "cannot resume a dead task" end def backtrace @thread.backtrace end end end
Version data entries
9 entries across 7 versions & 4 rubygems