require 'celluloid' module Nestene module Actor class AutonQueue include Celluloid include Celluloid::Notifications def initialize subscribe('state_update', :execute_next_step) end def execute_next_step topic, auton_id, state return unless state executing = nil type = nil ser = nil running = false storage_actor = Celluloid::Actor["storage:%s" % auton_id] return unless storage_actor storage_actor.update do |state| if state running = state.state == :ready && !state.queue.to_execute.empty? if running nx = state.queue.to_execute.shift executing = ExecutingMethod.new(nx) state.queue.currently_executing = executing type = state.type ser = state.serialized end end end return unless running if executing.name == '__terminate_this_auton' Celluloid::Actor["storage:%s" % auton_id].async.shutdown return end instance = Nestene::class_from_string(type).from_structure(ser) if instance.public_methods.include?(:context=) instance.public_send(:context=, AutonContext.new(auton_id)) end future = Celluloid::Future.new do instance.method(executing.name).call(*executing.parameters) end result = exception = nil begin result = future.value rescue Exception => e exception = e end executed = ExecutedMethod.new(executing) executed.result = result if exception executed.error = exception end if executed.callback if exception Celluloid::Actor[:nestene_core].async.schedule_step executed.callback.auton_id, ("%s_error" % executed.callback.name.to_s), [exception.class.name, exception.message] else Celluloid::Actor[:nestene_core].async.schedule_step executed.callback.auton_id, executed.callback.name, result end end Celluloid::Actor["storage:%s" % auton_id].update do |state| if state state.queue.currently_executing = nil state.serialized = instance.to_structure if exception if instance.methods.include?(:handle_exception) && executing.name != :handle_exception method = ScheduledMethod.new method.name = :handle_exception method.parameters = [exception.class.name, exception.message, executing.name, executing.parameters] method.uuid = SecureRandom.uuid state.queue.to_execute.unshift method else state.queue.failed = true end end state.queue.add_executed executed end end end end end end