lib/floe/workflow/states/task.rb in floe-0.11.0 vs lib/floe/workflow/states/task.rb in floe-0.11.3

- old
+ new

@@ -27,45 +27,41 @@ @result_path = ReferencePath.new(payload.fetch("ResultPath", "$")) @parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"] @result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"] @credentials = PayloadTemplate.new(payload["Credentials"]) if payload["Credentials"] - validate_state! + validate_state!(workflow) + rescue ArgumentError => err + raise Floe::InvalidWorkflowError, err.message end - def start(input) + def start(context) super - input = process_input(input) - runner_context = runner.run_async!(resource, input, credentials&.value({}, workflow.credentials), context) + input = process_input(context) + runner_context = runner.run_async!(resource, input, credentials&.value({}, context.credentials), context) context.state["RunnerContext"] = runner_context end - def status - @end ? "success" : "running" - end - - def finish + def finish(context) output = runner.output(context.state["RunnerContext"]) - if success? + if success?(context) output = parse_output(output) - context.output = process_output(context.input.dup, output) - super + context.output = process_output(context, output) else - context.next_state = nil - context.output = error = parse_error(output) - super - retry_state!(error) || catch_error!(error) || fail_workflow!(error) + error = parse_error(output) + retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error) end + super ensure runner.cleanup(context.state["RunnerContext"]) end - def running? - return true if waiting? + def running?(context) + return true if waiting?(context) runner.status!(context.state["RunnerContext"]) runner.running?(context.state["RunnerContext"]) end @@ -75,15 +71,15 @@ private attr_reader :runner - def validate_state! - validate_state_next! + def validate_state!(workflow) + validate_state_next!(workflow) end - def success? + def success?(context) runner.success?(context.state["RunnerContext"]) end def find_retrier(error) self.retry.detect { |r| (r.error_equals & [error, "States.ALL"]).any? } @@ -91,11 +87,11 @@ def find_catcher(error) self.catch.detect { |c| (c.error_equals & [error, "States.ALL"]).any? } end - def retry_state!(error) + def retry_state!(context, error) retrier = find_retrier(error["Error"]) if error return if retrier.nil? # If a different retrier is hit reset the context if !context["State"].key?("RetryCount") || context["State"]["Retrier"] != retrier.error_equals @@ -105,30 +101,33 @@ context["State"]["RetryCount"] += 1 return if context["State"]["RetryCount"] > retrier.max_attempts - wait_until!(:seconds => retrier.sleep_duration(context["State"]["RetryCount"])) + wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"])) context.next_state = context.state_name - logger.info("Running state: [#{long_name}] with input [#{context.input}]...Retry - delay: #{wait_until}") + context.output = error + logger.info("Running state: [#{long_name}] with input [#{context.input}] got error[#{context.output}]...Retry - delay: #{wait_until(context)}") true end - def catch_error!(error) + def catch_error!(context, error) catcher = find_catcher(error["Error"]) if error return if catcher.nil? context.next_state = catcher.next context.output = catcher.result_path.set(context.input, error) logger.info("Running state: [#{long_name}] with input [#{context.input}]...CatchError - next state: [#{context.next_state}] output: [#{context.output}]") true end - def fail_workflow!(error) - context.next_state = nil - context.output = {"Error" => error["Error"], "Cause" => error["Cause"]}.compact + def fail_workflow!(context, error) + # next_state is nil, and will be set to nil again in super + # keeping in here for completeness + context.next_state = nil + context.output = error logger.error("Running state: [#{long_name}] with input [#{context.input}]...Complete workflow - output: [#{context.output}]") end def parse_error(output) return if output.nil? @@ -144,13 +143,9 @@ return if output.nil? || output.empty? JSON.parse(output.split("\n").last) rescue JSON::ParserError nil - end - - def next_state - end? ? nil : @next end end end end end