lib/floe/workflow/states/task.rb in floe-0.11.3 vs lib/floe/workflow/states/task.rb in floe-0.12.0

- old
+ new

@@ -16,24 +16,25 @@ @heartbeat_seconds = payload["HeartbeatSeconds"] @next = payload["Next"] @end = !!payload["End"] @resource = payload["Resource"] - @runner = Floe::Runner.for_resource(@resource) + + missing_field_error!("Resource") unless @resource.kind_of?(String) + @runner = wrap_parser_error("Resource", @resource) { Floe::Runner.for_resource(@resource) } + @timeout_seconds = payload["TimeoutSeconds"] - @retry = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) } - @catch = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) } + @retry = payload["Retry"].to_a.map.with_index { |retrier, i| Retrier.new(workflow, name + ["Retry", i.to_s], retrier) } + @catch = payload["Catch"].to_a.map.with_index { |catcher, i| Catcher.new(workflow, name + ["Catch", i.to_s], catcher) } @input_path = Path.new(payload.fetch("InputPath", "$")) @output_path = Path.new(payload.fetch("OutputPath", "$")) @result_path = ReferencePath.new(payload.fetch("ResultPath", "$")) @parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"] @result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"] @credentials = PayloadTemplate.new(payload["Credentials"]) if payload["Credentials"] validate_state!(workflow) - rescue ArgumentError => err - raise Floe::InvalidWorkflowError, err.message end def start(context) super @@ -80,15 +81,15 @@ def success?(context) runner.success?(context.state["RunnerContext"]) end def find_retrier(error) - self.retry.detect { |r| (r.error_equals & [error, "States.ALL"]).any? } + self.retry.detect { |r| r.match_error?(error) } end def find_catcher(error) - self.catch.detect { |c| (c.error_equals & [error, "States.ALL"]).any? } + self.catch.detect { |c| c.match_error?(error) } end def retry_state!(context, error) retrier = find_retrier(error["Error"]) if error return if retrier.nil? @@ -104,30 +105,30 @@ return if context["State"]["RetryCount"] > retrier.max_attempts wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"])) context.next_state = context.state_name context.output = error - logger.info("Running state: [#{long_name}] with input [#{context.input}] got error[#{context.output}]...Retry - delay: #{wait_until(context)}") + logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}") true end def catch_error!(context, error) catcher = find_catcher(error["Error"]) if error return if catcher.nil? context.next_state = catcher.next context.output = catcher.result_path.set(context.input, error) - logger.info("Running state: [#{long_name}] with input [#{context.input}]...CatchError - next state: [#{context.next_state}] output: [#{context.output}]") + logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]") true end def fail_workflow!(context, error) # next_state is nil, and will be set to nil again in super # keeping in here for completeness context.next_state = nil context.output = error - logger.error("Running state: [#{long_name}] with input [#{context.input}]...Complete workflow - output: [#{context.output}]") + logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]") end def parse_error(output) return if output.nil? return output if output.kind_of?(Hash)