Sha256: de6158c3a015b4a6b743ce546c8b538d523605b4e2f059357f8d191d488caab9
Contents?: true
Size: 1.48 KB
Versions: 7
Compression:
Stored size: 1.48 KB
Contents
module Celluloid class Task # Tasks with a Thread backend class Threaded < Task # Run the given block within a task def initialize(type, meta) @resume_queue = Queue.new @exception_queue = Queue.new @yield_mutex = Mutex.new @yield_cond = ConditionVariable.new @thread = nil super end def create # TODO: move this to ActorSystem#get_thread (ThreadHandle inside Group::Pool) thread = Internals::ThreadHandle.new(Thread.current[:celluloid_actor_system], :task) do begin ex = @resume_queue.pop fail ex if ex.is_a?(TaskTerminated) yield rescue ::Exception => ex @exception_queue << ex ensure @yield_mutex.synchronize do @yield_cond.signal end end end @thread = thread end def signal @yield_mutex.synchronize do @yield_cond.signal end @resume_queue.pop end def deliver(value) fail DeadTaskError, "cannot resume a dead task" unless @thread.alive? @yield_mutex.synchronize do @resume_queue.push(value) @yield_cond.wait(@yield_mutex) fail @exception_queue.pop while @exception_queue.size > 0 end rescue ThreadError raise DeadTaskError, "cannot resume a dead task" end def backtrace @thread.backtrace end end end end
Version data entries
7 entries across 7 versions & 1 rubygems