lib/floe/workflow/states/task.rb in floe-0.3.1 vs lib/floe/workflow/states/task.rb in floe-0.4.0
- old
+ new
@@ -13,10 +13,11 @@
@heartbeat_seconds = payload["HeartbeatSeconds"]
@next = payload["Next"]
@end = !!payload["End"]
@resource = payload["Resource"]
+ @runner = Floe::Workflow::Runner.for_resource(@resource)
@timeout_seconds = payload["TimeoutSeconds"]
@retry = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) }
@catch = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) }
@input_path = Path.new(payload.fetch("InputPath", "$"))
@output_path = Path.new(payload.fetch("OutputPath", "$"))
@@ -24,40 +25,65 @@
@parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"]
@result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"]
@credentials = PayloadTemplate.new(payload["Credentials"]) if payload["Credentials"]
end
- def run!(input)
+ def start(input)
+ super
input = input_path.value(context, input)
input = parameters.value(context, input) if parameters
- runner = Floe::Workflow::Runner.for_resource(resource)
- _exit_status, results = runner.run!(resource, input, credentials&.value({}, workflow.credentials))
+ runner_context = runner.run_async!(resource, input, credentials&.value({}, workflow.credentials))
+ context.state["RunnerContext"] = runner_context
+ end
- output = process_output!(input, results)
- [@end ? nil : @next, output]
- rescue => err
- retrier = self.retry.detect { |r| (r.error_equals & [err.to_s, "States.ALL"]).any? }
- retry if retry!(retrier)
+ def status
+ @end ? "success" : "running"
+ end
- catcher = self.catch.detect { |c| (c.error_equals & [err.to_s, "States.ALL"]).any? }
- raise if catcher.nil?
+ def finish
+ results = runner.output(context.state["RunnerContext"])
- [catcher.next, output]
+ if success?
+ context.state["Output"] = process_output!(results)
+ context.next_state = next_state
+ else
+ retry_state!(results) || catch_error!(results)
+ end
+
+ super
+ ensure
+ runner.cleanup(context.state["RunnerContext"])
end
- def status
- @end ? "success" : "running"
+ def running?
+ runner.status!(context.state["RunnerContext"])
+ runner.running?(context.state["RunnerContext"])
end
def end?
@end
end
private
- def retry!(retrier)
+ attr_reader :runner
+
+ def success?
+ runner.success?(context.state["RunnerContext"])
+ end
+
+ def find_retrier(error)
+ self.retry.detect { |r| (r.error_equals & [error, "States.ALL"]).any? }
+ end
+
+ def find_catcher(error)
+ self.catch.detect { |c| (c.error_equals & [error, "States.ALL"]).any? }
+ end
+
+ def retry_state!(error)
+ retrier = find_retrier(error)
return if retrier.nil?
# If a different retrier is hit reset the context
if !context["State"].key?("RetryCount") || context["State"]["Retrier"] != retrier.error_equals
context["State"]["RetryCount"] = 0
@@ -66,15 +92,30 @@
context["State"]["RetryCount"] += 1
return if context["State"]["RetryCount"] > retrier.max_attempts
- Kernel.sleep(retrier.sleep_duration(context["State"]["RetryCount"]))
+ # TODO: Kernel.sleep(retrier.sleep_duration(context["State"]["RetryCount"]))
+ context.next_state = context.state_name
true
end
- def process_output!(output, results)
+ def catch_error!(error)
+ catcher = find_catcher(error)
+ raise error if catcher.nil?
+
+ context.next_state = catcher.next
+ end
+
+ def process_input(input)
+ input = input_path.value(context, input)
+ input = parameters.value(context, input) if parameters
+ input
+ end
+
+ def process_output!(results)
+ output = process_input(context.state["Input"])
return output if results.nil?
return if output_path.nil?
begin
results = JSON.parse(results)
@@ -83,9 +124,13 @@
end
results = result_selector.value(context, results) if result_selector
output = result_path.set(output, results)
output_path.value(context, output)
+ end
+
+ def next_state
+ end? ? nil : @next
end
end
end
end
end