lib/workato/connector/sdk/action.rb in workato-connector-sdk-0.1.2 vs lib/workato/connector/sdk/action.rb in workato-connector-sdk-0.2.0
- old
+ new
@@ -12,10 +12,12 @@
RETRY_DEFAULT_CODES = [429, 500, 502, 503, 504, 507].freeze
RETRY_DEFAULT_METHODS = %i[get head].freeze
RETRY_DELAY = 5.seconds
MAX_RETRIES = 3
+ MAX_REINVOKES = 5
+
def initialize(action:, connection: {}, methods: {}, settings: {}, object_definitions: nil)
super(
operation: action,
connection: connection,
methods: methods,
@@ -24,27 +26,54 @@
)
initialize_retry
end
- def execute(settings = nil, input = {}, extended_input_schema = [], extended_output_schema = [], &block)
+ def execute(settings = nil, input = {}, extended_input_schema = [], extended_output_schema = [], continue = {},
+ &block)
raise InvalidDefinitionError, "'execute' block is required for action" unless block || action[:execute]
- super(settings, input, extended_input_schema, extended_output_schema, &(block || action[:execute]))
+ loop do
+ if @reinvokes_remaining&.zero?
+ raise "Max number of reinvokes on SDK Gem reached. Current limit is #{reinvoke_limit}"
+ end
+
+ reinvoke_sleep if @reinvoke_after
+
+ @reinvoke_after = nil
+
+ result = super(
+ settings,
+ input,
+ extended_input_schema,
+ extended_output_schema,
+ continue,
+ &(block || action[:execute])
+ )
+
+ break result unless @reinvoke_after
+
+ continue = @reinvoke_after[:continue]
+ end
rescue RequestError => e
raise e unless retry?(e)
@retries_left -= 1
sleep(RETRY_DELAY) && retry
end
def checkpoint!(continue:, temp_output: nil)
- raise NotImplementedError
+ # no-op
end
def reinvoke_after(seconds:, continue:, temp_output: nil)
- raise NotImplementedError
+ @reinvokes_remaining = (@reinvokes_remaining ? @reinvokes_remaining - 1 : reinvoke_limit)
+ @reinvoke_after = {
+ seconds: seconds,
+ continue: continue,
+ temp_output: temp_output
+ }
end
private
def retry_on_response
@@ -77,9 +106,17 @@
return unless @retry_matchers.empty? || @retry_matchers.any? do |m|
m === exception.message || m === exception.response
end
@retry_methods.include?(exception.method.to_s.downcase)
+ end
+
+ def reinvoke_sleep
+ sleep((ENV['WAIT_REINVOKE_AFTER'].presence || @reinvoke_after[:seconds]).to_f)
+ end
+
+ def reinvoke_limit
+ @reinvoke_limit ||= (ENV['MAX_REINVOKES'].presence || MAX_REINVOKES).to_i
end
alias action operation
end
end