lib/nestene/actor/auton_queue.rb in nestene-0.2.0 vs lib/nestene/actor/auton_queue.rb in nestene-0.2.1
- old
+ new
@@ -10,28 +10,42 @@
subscribe('state_update', :execute_next_step)
end
def execute_next_step topic, auton_id, state
+
+ return unless state
+
executing = nil
type = nil
ser = nil
running = false
- Celluloid::Actor["storage:%s" % auton_id].update do |state|
- running = state.state == :ready && !state.queue.to_execute.empty?
- if running
- nx = state.queue.to_execute.shift
- executing = ExecutingMethod.new(nx)
- state.queue.currently_executing = executing
- type = state.type
- ser = state.serialized
+ storage_actor = Celluloid::Actor["storage:%s" % auton_id]
+
+ return unless storage_actor
+
+ storage_actor.update do |state|
+ if state
+ running = state.state == :ready && !state.queue.to_execute.empty?
+ if running
+ nx = state.queue.to_execute.shift
+ executing = ExecutingMethod.new(nx)
+ state.queue.currently_executing = executing
+ type = state.type
+ ser = state.serialized
+ end
end
end
return unless running
+ if executing.name == '__terminate_this_auton'
+ Celluloid::Actor["storage:%s" % auton_id].async.shutdown
+ return
+ end
+
instance = Nestene::class_from_string(type).from_structure(ser)
if instance.public_methods.include?(:context=)
instance.public_send(:context=, AutonContext.new(auton_id))
end
@@ -63,23 +77,25 @@
Celluloid::Actor[:nestene_core].async.schedule_step executed.callback.auton_id, executed.callback.name, result
end
end
Celluloid::Actor["storage:%s" % auton_id].update do |state|
- state.queue.currently_executing = nil
- state.serialized = instance.to_structure
- if exception
- if instance.methods.include?(:handle_exception) && executing.name != :handle_exception
- method = ScheduledMethod.new
- method.name = :handle_exception
- method.parameters = [exception.class.name, exception.message, executing.name, executing.parameters]
- method.uuid = SecureRandom.uuid
- state.queue.to_execute.unshift method
- else
- state.queue.failed = true
+ if state
+ state.queue.currently_executing = nil
+ state.serialized = instance.to_structure
+ if exception
+ if instance.methods.include?(:handle_exception) && executing.name != :handle_exception
+ method = ScheduledMethod.new
+ method.name = :handle_exception
+ method.parameters = [exception.class.name, exception.message, executing.name, executing.parameters]
+ method.uuid = SecureRandom.uuid
+ state.queue.to_execute.unshift method
+ else
+ state.queue.failed = true
+ end
end
+ state.queue.add_executed executed
end
- state.queue.add_executed executed
end
end
end
end