lib/floe/workflow/states/task.rb in floe-0.4.0 vs lib/floe/workflow/states/task.rb in floe-0.4.1
- old
+ new
@@ -27,37 +27,41 @@
@credentials = PayloadTemplate.new(payload["Credentials"]) if payload["Credentials"]
end
def start(input)
super
- input = input_path.value(context, input)
- input = parameters.value(context, input) if parameters
+ input = process_input(input)
runner_context = runner.run_async!(resource, input, credentials&.value({}, workflow.credentials))
+
context.state["RunnerContext"] = runner_context
end
def status
@end ? "success" : "running"
end
def finish
- results = runner.output(context.state["RunnerContext"])
+ output = runner.output(context.state["RunnerContext"])
if success?
- context.state["Output"] = process_output!(results)
+ output = parse_output(output)
+ context.state["Output"] = process_output!(output)
context.next_state = next_state
else
- retry_state!(results) || catch_error!(results)
+ error = parse_error(output)
+ retry_state!(error) || catch_error!(error) || fail_workflow!(error)
end
super
ensure
runner.cleanup(context.state["RunnerContext"])
end
def running?
+ return true if waiting?
+
runner.status!(context.state["RunnerContext"])
runner.running?(context.state["RunnerContext"])
end
def end?
@@ -79,11 +83,11 @@
def find_catcher(error)
self.catch.detect { |c| (c.error_equals & [error, "States.ALL"]).any? }
end
def retry_state!(error)
- retrier = find_retrier(error)
+ retrier = find_retrier(error["Error"]) if error
return if retrier.nil?
# If a different retrier is hit reset the context
if !context["State"].key?("RetryCount") || context["State"]["Retrier"] != retrier.error_equals
context["State"]["RetryCount"] = 0
@@ -92,37 +96,55 @@
context["State"]["RetryCount"] += 1
return if context["State"]["RetryCount"] > retrier.max_attempts
- # TODO: Kernel.sleep(retrier.sleep_duration(context["State"]["RetryCount"]))
+ wait(:seconds => retrier.sleep_duration(context["State"]["RetryCount"]))
context.next_state = context.state_name
true
end
def catch_error!(error)
- catcher = find_catcher(error)
- raise error if catcher.nil?
+ catcher = find_catcher(error["Error"]) if error
+ return if catcher.nil?
context.next_state = catcher.next
+ context.output = catcher.result_path.set(context.input, error)
+ true
end
+ def fail_workflow!(error)
+ context.next_state = nil
+ context.output = {"Error" => error["Error"], "Cause" => error["Cause"]}.compact
+ context.state["Error"] = context.output["Error"]
+ end
+
def process_input(input)
input = input_path.value(context, input)
input = parameters.value(context, input) if parameters
input
end
+ def parse_error(output)
+ return if output.nil?
+
+ JSON.parse(output)
+ rescue JSON::ParserError
+ {"Error" => output}
+ end
+
+ def parse_output(output)
+ return if output.nil?
+
+ JSON.parse(output.split("\n").last)
+ rescue JSON::ParserError
+ nil
+ end
+
def process_output!(results)
- output = process_input(context.state["Input"])
+ output = context.input.dup
return output if results.nil?
return if output_path.nil?
-
- begin
- results = JSON.parse(results)
- rescue JSON::ParserError
- results = {"results" => results}
- end
results = result_selector.value(context, results) if result_selector
output = result_path.set(output, results)
output_path.value(context, output)
end