lib/pione/agent/task-worker.rb in pione-0.1.2 vs lib/pione/agent/task-worker.rb in pione-0.1.3

- old
+ new

@@ -104,17 +104,15 @@ @env = ENV.clone end # Transition method for the state +task_waiting+. The agent takes a +task+ # tuple and writes a +working+ tuple. + # # @return [Task] # task tuple def transit_to_task_waiting - @task = nil - @rule = nil task = take(Tuple[:task].new(features: @features)) - @task = task begin write(Tuple[:working].new(task.domain, task.digest)) write(Tuple[:foreground].new(task.domain, task.digest)) rescue Rinda::RedundantTupleError raise Restart.new @@ -124,27 +122,16 @@ # Transition method for the state +rule_loading+. # @return [Array<Task, Rule>] # task tuple and the rule def transit_to_rule_loading(task) - rule = - begin - read0(Tuple[:rule].new(rule_path: task.rule_path)) - rescue Rinda::RequestExpiredError - log do |msg| - msg.add_record(agent_type, "action", "request_rule") - msg.add_record(agent_type, "object", task.rule_path) - end - write(Tuple[:request_rule].new(task.rule_path)) - read(Tuple[:rule].new(rule_path: task.rule_path)) - end - @rule = rule.content - if rule.status == :known - return task, rule.content - else - raise UnknownRuleError.new(task) + rule = read!(Tuple[:rule].new(rule_path: task.rule_path)) + unless rule + write(Tuple[:request_rule].new(task.rule_path)) + rule = read(Tuple[:rule].new(rule_path: task.rule_path)) end + return task, rule.content end # Transition method for the state +task_executing+. # @param [Task] task # task tuple @@ -175,17 +162,20 @@ while @handler_thread.alive? do if @child_agent.nil? or not(@child_agent.running_thread.alive?) debug_message "+++ Create Sub Task worker +++" @child_agent = self.class.new(tuple_space_server, @features) @child_agent.once = true - log do |msg| - msg.add_record(agent_type, "action", "create_sub_task_worker") - msg.add_record(agent_type, "uuid", uuid) - msg.add_record(agent_type, "object", @child_agent.uuid) + + Log::CreateChildTaskWorkerProcessRecord.new.tap do |record| + record.parent = uuid + record.child = @child_agent.uuid + + with_process_log(record) do + take!(Tuple[:foreground].new(task.domain, nil)) + @child_agent.start + end end - take0(Tuple[:foreground].new(task.domain, nil)) rescue true - @child_agent.start else tail = descendant.last if tail.action? tail.running_thread.join else @@ -207,11 +197,11 @@ @flow = nil @action = nil # remove the working tuple - take0(Tuple[:working].new(task.domain, nil)) rescue true + take!(Tuple[:working].new(task.domain, nil)) debug_message_end "End Task Execution #{rule.rule_path} by worker(#{uuid})" return task, rule, handler, @__result_task_execution__ end @@ -243,19 +233,22 @@ # task tuple # @param [RuleHandler] handler # the handler # @return [void] def transit_to_task_finishing(task, handler) - log do |msg| - msg.add_record(agent_type, "action", "finished_task") - msg.add_record(agent_type, "uuid", uuid) - msg.add_record(agent_type, "object", task) - end + # put finished tuple finished = Tuple[:finished].new( handler.domain, :succeeded, handler.outputs, task.digest ) write(finished) - take0(Tuple[:foreground].new(task.domain, nil)) rescue true + # log task process + record = handler.task_process_record.merge(transition: "complete") + process_log(record) + + # remove foreground + take!(Tuple[:foreground].new(task.domain, nil)) + + # terminate if the agent is child terminate if @once end # State error def transit_to_error(e)