lib/floe/workflow/states/task.rb in floe-0.11.0 vs lib/floe/workflow/states/task.rb in floe-0.11.3
- old
+ new
@@ -27,45 +27,41 @@
@result_path = ReferencePath.new(payload.fetch("ResultPath", "$"))
@parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"]
@result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"]
@credentials = PayloadTemplate.new(payload["Credentials"]) if payload["Credentials"]
- validate_state!
+ validate_state!(workflow)
+ rescue ArgumentError => err
+ raise Floe::InvalidWorkflowError, err.message
end
- def start(input)
+ def start(context)
super
- input = process_input(input)
- runner_context = runner.run_async!(resource, input, credentials&.value({}, workflow.credentials), context)
+ input = process_input(context)
+ runner_context = runner.run_async!(resource, input, credentials&.value({}, context.credentials), context)
context.state["RunnerContext"] = runner_context
end
- def status
- @end ? "success" : "running"
- end
-
- def finish
+ def finish(context)
output = runner.output(context.state["RunnerContext"])
- if success?
+ if success?(context)
output = parse_output(output)
- context.output = process_output(context.input.dup, output)
- super
+ context.output = process_output(context, output)
else
- context.next_state = nil
- context.output = error = parse_error(output)
- super
- retry_state!(error) || catch_error!(error) || fail_workflow!(error)
+ error = parse_error(output)
+ retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error)
end
+ super
ensure
runner.cleanup(context.state["RunnerContext"])
end
- def running?
- return true if waiting?
+ def running?(context)
+ return true if waiting?(context)
runner.status!(context.state["RunnerContext"])
runner.running?(context.state["RunnerContext"])
end
@@ -75,15 +71,15 @@
private
attr_reader :runner
- def validate_state!
- validate_state_next!
+ def validate_state!(workflow)
+ validate_state_next!(workflow)
end
- def success?
+ def success?(context)
runner.success?(context.state["RunnerContext"])
end
def find_retrier(error)
self.retry.detect { |r| (r.error_equals & [error, "States.ALL"]).any? }
@@ -91,11 +87,11 @@
def find_catcher(error)
self.catch.detect { |c| (c.error_equals & [error, "States.ALL"]).any? }
end
- def retry_state!(error)
+ def retry_state!(context, error)
retrier = find_retrier(error["Error"]) if error
return if retrier.nil?
# If a different retrier is hit reset the context
if !context["State"].key?("RetryCount") || context["State"]["Retrier"] != retrier.error_equals
@@ -105,30 +101,33 @@
context["State"]["RetryCount"] += 1
return if context["State"]["RetryCount"] > retrier.max_attempts
- wait_until!(:seconds => retrier.sleep_duration(context["State"]["RetryCount"]))
+ wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"]))
context.next_state = context.state_name
- logger.info("Running state: [#{long_name}] with input [#{context.input}]...Retry - delay: #{wait_until}")
+ context.output = error
+ logger.info("Running state: [#{long_name}] with input [#{context.input}] got error[#{context.output}]...Retry - delay: #{wait_until(context)}")
true
end
- def catch_error!(error)
+ def catch_error!(context, error)
catcher = find_catcher(error["Error"]) if error
return if catcher.nil?
context.next_state = catcher.next
context.output = catcher.result_path.set(context.input, error)
logger.info("Running state: [#{long_name}] with input [#{context.input}]...CatchError - next state: [#{context.next_state}] output: [#{context.output}]")
true
end
- def fail_workflow!(error)
- context.next_state = nil
- context.output = {"Error" => error["Error"], "Cause" => error["Cause"]}.compact
+ def fail_workflow!(context, error)
+ # next_state is nil, and will be set to nil again in super
+ # keeping in here for completeness
+ context.next_state = nil
+ context.output = error
logger.error("Running state: [#{long_name}] with input [#{context.input}]...Complete workflow - output: [#{context.output}]")
end
def parse_error(output)
return if output.nil?
@@ -144,13 +143,9 @@
return if output.nil? || output.empty?
JSON.parse(output.split("\n").last)
rescue JSON::ParserError
nil
- end
-
- def next_state
- end? ? nil : @next
end
end
end
end
end