lib/floe/workflow/states/task.rb in floe-0.3.1 vs lib/floe/workflow/states/task.rb in floe-0.4.0

- old
+ new

@@ -13,10 +13,11 @@ @heartbeat_seconds = payload["HeartbeatSeconds"] @next = payload["Next"] @end = !!payload["End"] @resource = payload["Resource"] + @runner = Floe::Workflow::Runner.for_resource(@resource) @timeout_seconds = payload["TimeoutSeconds"] @retry = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) } @catch = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) } @input_path = Path.new(payload.fetch("InputPath", "$")) @output_path = Path.new(payload.fetch("OutputPath", "$")) @@ -24,40 +25,65 @@ @parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"] @result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"] @credentials = PayloadTemplate.new(payload["Credentials"]) if payload["Credentials"] end - def run!(input) + def start(input) + super input = input_path.value(context, input) input = parameters.value(context, input) if parameters - runner = Floe::Workflow::Runner.for_resource(resource) - _exit_status, results = runner.run!(resource, input, credentials&.value({}, workflow.credentials)) + runner_context = runner.run_async!(resource, input, credentials&.value({}, workflow.credentials)) + context.state["RunnerContext"] = runner_context + end - output = process_output!(input, results) - [@end ? nil : @next, output] - rescue => err - retrier = self.retry.detect { |r| (r.error_equals & [err.to_s, "States.ALL"]).any? } - retry if retry!(retrier) + def status + @end ? "success" : "running" + end - catcher = self.catch.detect { |c| (c.error_equals & [err.to_s, "States.ALL"]).any? } - raise if catcher.nil? + def finish + results = runner.output(context.state["RunnerContext"]) - [catcher.next, output] + if success? + context.state["Output"] = process_output!(results) + context.next_state = next_state + else + retry_state!(results) || catch_error!(results) + end + + super + ensure + runner.cleanup(context.state["RunnerContext"]) end - def status - @end ? "success" : "running" + def running? + runner.status!(context.state["RunnerContext"]) + runner.running?(context.state["RunnerContext"]) end def end? @end end private - def retry!(retrier) + attr_reader :runner + + def success? + runner.success?(context.state["RunnerContext"]) + end + + def find_retrier(error) + self.retry.detect { |r| (r.error_equals & [error, "States.ALL"]).any? } + end + + def find_catcher(error) + self.catch.detect { |c| (c.error_equals & [error, "States.ALL"]).any? } + end + + def retry_state!(error) + retrier = find_retrier(error) return if retrier.nil? # If a different retrier is hit reset the context if !context["State"].key?("RetryCount") || context["State"]["Retrier"] != retrier.error_equals context["State"]["RetryCount"] = 0 @@ -66,15 +92,30 @@ context["State"]["RetryCount"] += 1 return if context["State"]["RetryCount"] > retrier.max_attempts - Kernel.sleep(retrier.sleep_duration(context["State"]["RetryCount"])) + # TODO: Kernel.sleep(retrier.sleep_duration(context["State"]["RetryCount"])) + context.next_state = context.state_name true end - def process_output!(output, results) + def catch_error!(error) + catcher = find_catcher(error) + raise error if catcher.nil? + + context.next_state = catcher.next + end + + def process_input(input) + input = input_path.value(context, input) + input = parameters.value(context, input) if parameters + input + end + + def process_output!(results) + output = process_input(context.state["Input"]) return output if results.nil? return if output_path.nil? begin results = JSON.parse(results) @@ -83,9 +124,13 @@ end results = result_selector.value(context, results) if result_selector output = result_path.set(output, results) output_path.value(context, output) + end + + def next_state + end? ? nil : @next end end end end end