lib/workato/connector/sdk/action.rb in workato-connector-sdk-1.2.0 vs lib/workato/connector/sdk/action.rb in workato-connector-sdk-1.3.0
- old
+ new
@@ -9,33 +9,38 @@
module Sdk
class Action < Operation
extend T::Sig
using BlockInvocationRefinements
+ include Dsl::ReinvokeAfter
+
RETRY_DEFAULT_CODES = T.let([429, 500, 502, 503, 504, 507].freeze, T::Array[Integer])
RETRY_DEFAULT_METHODS = T.let(%i[get head].freeze, T::Array[Symbol])
RETRY_DELAY = T.let(5, Integer) # seconds
+ RETRY_DELAY_EXP_BASE = T.let(2, Integer)
MAX_RETRIES = 3
- MAX_REINVOKES = 5
-
sig do
params(
action: SorbetTypes::SourceHash,
methods: SorbetTypes::SourceHash,
connection: Connection,
- object_definitions: T.nilable(ObjectDefinitions)
+ object_definitions: T.nilable(ObjectDefinitions),
+ streams: Streams
).void
end
- def initialize(action:, methods: {}, connection: Connection.new, object_definitions: nil)
+ def initialize(action:, methods: {}, connection: Connection.new, object_definitions: nil,
+ streams: ProhibitedStreams.new)
super(
operation: action,
connection: connection,
methods: methods,
- object_definitions: object_definitions
+ object_definitions: object_definitions,
+ streams: streams
)
+ @retry_delay_factor = T.let(1, Integer)
@retries_left = T.let(0, Integer)
@retry_codes = T.let([], T::Array[Integer])
@retry_methods = T.let([], T::Array[String])
@retry_matchers = T.let([], T::Array[T.any(Symbol, String, Regexp)])
@@ -56,37 +61,25 @@
end
def execute(settings = nil, input = {}, extended_input_schema = [], extended_output_schema = [], continue = {},
&block)
raise InvalidDefinitionError, "'execute' block is required for action" unless block || action[:execute]
- loop do
- if @reinvokes_remaining&.zero?
- raise "Max number of reinvokes on SDK Gem reached. Current limit is #{reinvoke_limit}"
- end
-
- reinvoke_sleep if @reinvoke_after
-
- reinvoke_reset
-
- result = super(
+ loop_reinvoke_after(continue) do |next_continue|
+ return super(
settings,
input,
extended_input_schema,
extended_output_schema,
- continue,
+ next_continue,
&(block || action[:execute])
)
-
- break result unless @reinvoke_after
-
- continue = @reinvoke_after.continue
end
- rescue RequestError => e
+ rescue RequestFailedError => e
raise e unless retry?(e)
@retries_left -= 1
- sleep(RETRY_DELAY)
+ retry_sleep
retry
end
sig { params(input: SorbetTypes::OperationInputHash).returns(T::Hash[T.any(String, Symbol), T.untyped]) }
def invoke(input = {})
@@ -98,36 +91,10 @@
input = apply_input_schema(input, config_schema + input_schema)
output = execute(nil, input, input_schema, output_schema)
apply_output_schema(output, output_schema)
end
- sig do
- params(
- continue: T::Hash[T.untyped, T.untyped],
- temp_output: T.nilable(T::Hash[T.untyped, T.untyped])
- ).void
- end
- def checkpoint!(continue:, temp_output: nil)
- # no-op
- end
-
- sig do
- params(
- seconds: Integer,
- continue: T::Hash[T.untyped, T.untyped],
- temp_output: T.nilable(T::Hash[T.untyped, T.untyped])
- ).void
- end
- def reinvoke_after(seconds:, continue:, temp_output: nil) # rubocop:disable Lint/UnusedMethodArgument
- @reinvokes_remaining = T.let(@reinvokes_remaining, T.nilable(Integer))
- @reinvokes_remaining = (@reinvokes_remaining ? @reinvokes_remaining - 1 : reinvoke_limit)
- @reinvoke_after = ReinvokeAfter.new(
- seconds: seconds,
- continue: continue
- )
- end
-
private
sig { returns(T::Array[T.any(Symbol, String, Regexp, Integer)]) }
def retry_on_response
Array(action[:retry_on_response])
@@ -148,14 +115,14 @@
return if retry_on_response.blank?
retry_on_response.each { |m| m.is_a?(::Integer) ? @retry_codes << m : @retry_matchers << m }
@retry_codes = RETRY_DEFAULT_CODES if @retry_codes.empty?
@retry_methods = (retry_on_request.presence || RETRY_DEFAULT_METHODS).map(&:to_s).map(&:downcase)
- @retries_left = [[max_retries.is_a?(::Integer) && max_retries || MAX_RETRIES, MAX_RETRIES].min, 0].max
+ @retries_left = [[(max_retries.is_a?(::Integer) && max_retries) || MAX_RETRIES, MAX_RETRIES].min, 0].max
end
- sig { params(exception: RequestError).returns(T::Boolean) }
+ sig { params(exception: RequestFailedError).returns(T::Boolean) }
def retry?(exception)
return false unless @retries_left.positive?
return false unless @retry_codes.include?(exception.code.to_i)
return false unless @retry_matchers.empty? || @retry_matchers.any? do |m|
m === exception.message || m === exception.response
@@ -163,30 +130,13 @@
@retry_methods.include?(exception.method.to_s.downcase)
end
sig { void }
- def reinvoke_sleep
- sleep((ENV['WAIT_REINVOKE_AFTER'].presence || T.must(@reinvoke_after).seconds).to_f)
+ def retry_sleep
+ sleep(@retry_delay_factor * RETRY_DELAY)
+ @retry_delay_factor *= RETRY_DELAY_EXP_BASE
end
-
- sig { returns(Integer) }
- def reinvoke_limit
- @reinvoke_limit = T.let(@reinvoke_limit, T.nilable(Integer))
- @reinvoke_limit ||= (ENV['MAX_REINVOKES'].presence || MAX_REINVOKES).to_i
- end
-
- sig { void }
- def reinvoke_reset
- @reinvoke_after = T.let(nil, T.nilable(ReinvokeAfter))
- end
-
- class ReinvokeAfter < T::Struct
- prop :seconds, T.any(Float, Integer)
- prop :continue, T::Hash[T.untyped, T.untyped]
- end
-
- private_constant :ReinvokeAfter
alias action operation
end
end
end