lib/floe/workflow/states/task.rb in floe-0.4.0 vs lib/floe/workflow/states/task.rb in floe-0.4.1

- old
+ new

@@ -27,37 +27,41 @@ @credentials = PayloadTemplate.new(payload["Credentials"]) if payload["Credentials"] end def start(input) super - input = input_path.value(context, input) - input = parameters.value(context, input) if parameters + input = process_input(input) runner_context = runner.run_async!(resource, input, credentials&.value({}, workflow.credentials)) + context.state["RunnerContext"] = runner_context end def status @end ? "success" : "running" end def finish - results = runner.output(context.state["RunnerContext"]) + output = runner.output(context.state["RunnerContext"]) if success? - context.state["Output"] = process_output!(results) + output = parse_output(output) + context.state["Output"] = process_output!(output) context.next_state = next_state else - retry_state!(results) || catch_error!(results) + error = parse_error(output) + retry_state!(error) || catch_error!(error) || fail_workflow!(error) end super ensure runner.cleanup(context.state["RunnerContext"]) end def running? + return true if waiting? + runner.status!(context.state["RunnerContext"]) runner.running?(context.state["RunnerContext"]) end def end? @@ -79,11 +83,11 @@ def find_catcher(error) self.catch.detect { |c| (c.error_equals & [error, "States.ALL"]).any? } end def retry_state!(error) - retrier = find_retrier(error) + retrier = find_retrier(error["Error"]) if error return if retrier.nil? # If a different retrier is hit reset the context if !context["State"].key?("RetryCount") || context["State"]["Retrier"] != retrier.error_equals context["State"]["RetryCount"] = 0 @@ -92,37 +96,55 @@ context["State"]["RetryCount"] += 1 return if context["State"]["RetryCount"] > retrier.max_attempts - # TODO: Kernel.sleep(retrier.sleep_duration(context["State"]["RetryCount"])) + wait(:seconds => retrier.sleep_duration(context["State"]["RetryCount"])) context.next_state = context.state_name true end def catch_error!(error) - catcher = find_catcher(error) - raise error if catcher.nil? + catcher = find_catcher(error["Error"]) if error + return if catcher.nil? context.next_state = catcher.next + context.output = catcher.result_path.set(context.input, error) + true end + def fail_workflow!(error) + context.next_state = nil + context.output = {"Error" => error["Error"], "Cause" => error["Cause"]}.compact + context.state["Error"] = context.output["Error"] + end + def process_input(input) input = input_path.value(context, input) input = parameters.value(context, input) if parameters input end + def parse_error(output) + return if output.nil? + + JSON.parse(output) + rescue JSON::ParserError + {"Error" => output} + end + + def parse_output(output) + return if output.nil? + + JSON.parse(output.split("\n").last) + rescue JSON::ParserError + nil + end + def process_output!(results) - output = process_input(context.state["Input"]) + output = context.input.dup return output if results.nil? return if output_path.nil? - - begin - results = JSON.parse(results) - rescue JSON::ParserError - results = {"results" => results} - end results = result_selector.value(context, results) if result_selector output = result_path.set(output, results) output_path.value(context, output) end