lib/pione/agent/task-worker.rb in pione-0.1.2 vs lib/pione/agent/task-worker.rb in pione-0.1.3
- old
+ new
@@ -104,17 +104,15 @@
@env = ENV.clone
end
# Transition method for the state +task_waiting+. The agent takes a +task+
# tuple and writes a +working+ tuple.
+ #
# @return [Task]
# task tuple
def transit_to_task_waiting
- @task = nil
- @rule = nil
task = take(Tuple[:task].new(features: @features))
- @task = task
begin
write(Tuple[:working].new(task.domain, task.digest))
write(Tuple[:foreground].new(task.domain, task.digest))
rescue Rinda::RedundantTupleError
raise Restart.new
@@ -124,27 +122,16 @@
# Transition method for the state +rule_loading+.
# @return [Array<Task, Rule>]
# task tuple and the rule
def transit_to_rule_loading(task)
- rule =
- begin
- read0(Tuple[:rule].new(rule_path: task.rule_path))
- rescue Rinda::RequestExpiredError
- log do |msg|
- msg.add_record(agent_type, "action", "request_rule")
- msg.add_record(agent_type, "object", task.rule_path)
- end
- write(Tuple[:request_rule].new(task.rule_path))
- read(Tuple[:rule].new(rule_path: task.rule_path))
- end
- @rule = rule.content
- if rule.status == :known
- return task, rule.content
- else
- raise UnknownRuleError.new(task)
+ rule = read!(Tuple[:rule].new(rule_path: task.rule_path))
+ unless rule
+ write(Tuple[:request_rule].new(task.rule_path))
+ rule = read(Tuple[:rule].new(rule_path: task.rule_path))
end
+ return task, rule.content
end
# Transition method for the state +task_executing+.
# @param [Task] task
# task tuple
@@ -175,17 +162,20 @@
while @handler_thread.alive? do
if @child_agent.nil? or not(@child_agent.running_thread.alive?)
debug_message "+++ Create Sub Task worker +++"
@child_agent = self.class.new(tuple_space_server, @features)
@child_agent.once = true
- log do |msg|
- msg.add_record(agent_type, "action", "create_sub_task_worker")
- msg.add_record(agent_type, "uuid", uuid)
- msg.add_record(agent_type, "object", @child_agent.uuid)
+
+ Log::CreateChildTaskWorkerProcessRecord.new.tap do |record|
+ record.parent = uuid
+ record.child = @child_agent.uuid
+
+ with_process_log(record) do
+ take!(Tuple[:foreground].new(task.domain, nil))
+ @child_agent.start
+ end
end
- take0(Tuple[:foreground].new(task.domain, nil)) rescue true
- @child_agent.start
else
tail = descendant.last
if tail.action?
tail.running_thread.join
else
@@ -207,11 +197,11 @@
@flow = nil
@action = nil
# remove the working tuple
- take0(Tuple[:working].new(task.domain, nil)) rescue true
+ take!(Tuple[:working].new(task.domain, nil))
debug_message_end "End Task Execution #{rule.rule_path} by worker(#{uuid})"
return task, rule, handler, @__result_task_execution__
end
@@ -243,19 +233,22 @@
# task tuple
# @param [RuleHandler] handler
# the handler
# @return [void]
def transit_to_task_finishing(task, handler)
- log do |msg|
- msg.add_record(agent_type, "action", "finished_task")
- msg.add_record(agent_type, "uuid", uuid)
- msg.add_record(agent_type, "object", task)
- end
+ # put finished tuple
finished = Tuple[:finished].new(
handler.domain, :succeeded, handler.outputs, task.digest
)
write(finished)
- take0(Tuple[:foreground].new(task.domain, nil)) rescue true
+ # log task process
+ record = handler.task_process_record.merge(transition: "complete")
+ process_log(record)
+
+ # remove foreground
+ take!(Tuple[:foreground].new(task.domain, nil))
+
+ # terminate if the agent is child
terminate if @once
end
# State error
def transit_to_error(e)