lib/floe/workflow/states/task.rb in floe-0.11.3 vs lib/floe/workflow/states/task.rb in floe-0.12.0
- old
+ new
@@ -16,24 +16,25 @@
@heartbeat_seconds = payload["HeartbeatSeconds"]
@next = payload["Next"]
@end = !!payload["End"]
@resource = payload["Resource"]
- @runner = Floe::Runner.for_resource(@resource)
+
+ missing_field_error!("Resource") unless @resource.kind_of?(String)
+ @runner = wrap_parser_error("Resource", @resource) { Floe::Runner.for_resource(@resource) }
+
@timeout_seconds = payload["TimeoutSeconds"]
- @retry = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) }
- @catch = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) }
+ @retry = payload["Retry"].to_a.map.with_index { |retrier, i| Retrier.new(workflow, name + ["Retry", i.to_s], retrier) }
+ @catch = payload["Catch"].to_a.map.with_index { |catcher, i| Catcher.new(workflow, name + ["Catch", i.to_s], catcher) }
@input_path = Path.new(payload.fetch("InputPath", "$"))
@output_path = Path.new(payload.fetch("OutputPath", "$"))
@result_path = ReferencePath.new(payload.fetch("ResultPath", "$"))
@parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"]
@result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"]
@credentials = PayloadTemplate.new(payload["Credentials"]) if payload["Credentials"]
validate_state!(workflow)
- rescue ArgumentError => err
- raise Floe::InvalidWorkflowError, err.message
end
def start(context)
super
@@ -80,15 +81,15 @@
def success?(context)
runner.success?(context.state["RunnerContext"])
end
def find_retrier(error)
- self.retry.detect { |r| (r.error_equals & [error, "States.ALL"]).any? }
+ self.retry.detect { |r| r.match_error?(error) }
end
def find_catcher(error)
- self.catch.detect { |c| (c.error_equals & [error, "States.ALL"]).any? }
+ self.catch.detect { |c| c.match_error?(error) }
end
def retry_state!(context, error)
retrier = find_retrier(error["Error"]) if error
return if retrier.nil?
@@ -104,30 +105,30 @@
return if context["State"]["RetryCount"] > retrier.max_attempts
wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"]))
context.next_state = context.state_name
context.output = error
- logger.info("Running state: [#{long_name}] with input [#{context.input}] got error[#{context.output}]...Retry - delay: #{wait_until(context)}")
+ logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}")
true
end
def catch_error!(context, error)
catcher = find_catcher(error["Error"]) if error
return if catcher.nil?
context.next_state = catcher.next
context.output = catcher.result_path.set(context.input, error)
- logger.info("Running state: [#{long_name}] with input [#{context.input}]...CatchError - next state: [#{context.next_state}] output: [#{context.output}]")
+ logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]")
true
end
def fail_workflow!(context, error)
# next_state is nil, and will be set to nil again in super
# keeping in here for completeness
context.next_state = nil
context.output = error
- logger.error("Running state: [#{long_name}] with input [#{context.input}]...Complete workflow - output: [#{context.output}]")
+ logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]")
end
def parse_error(output)
return if output.nil?
return output if output.kind_of?(Hash)