lib/workato/connector/sdk/action.rb in workato-connector-sdk-1.0.1 vs lib/workato/connector/sdk/action.rb in workato-connector-sdk-1.0.2

- old
+ new

@@ -1,35 +1,61 @@ +# typed: strict # frozen_string_literal: true require_relative './operation' require_relative './block_invocation_refinements' module Workato module Connector module Sdk class Action < Operation + extend T::Sig using BlockInvocationRefinements - RETRY_DEFAULT_CODES = [429, 500, 502, 503, 504, 507].freeze - RETRY_DEFAULT_METHODS = %i[get head].freeze - RETRY_DELAY = 5.seconds + RETRY_DEFAULT_CODES = T.let([429, 500, 502, 503, 504, 507].freeze, T::Array[Integer]) + RETRY_DEFAULT_METHODS = T.let(%i[get head].freeze, T::Array[Symbol]) + RETRY_DELAY = T.let(5, Integer) # seconds MAX_RETRIES = 3 MAX_REINVOKES = 5 - def initialize(action:, connection: {}, methods: {}, settings: {}, object_definitions: nil) + sig do + params( + action: SorbetTypes::SourceHash, + methods: SorbetTypes::SourceHash, + connection: Connection, + object_definitions: T.nilable(ObjectDefinitions) + ).void + end + def initialize(action:, methods: {}, connection: Connection.new, object_definitions: nil) super( operation: action, connection: connection, methods: methods, - settings: settings, object_definitions: object_definitions ) + @retries_left = T.let(0, Integer) + @retry_codes = T.let([], T::Array[Integer]) + @retry_methods = T.let([], T::Array[String]) + @retry_matchers = T.let([], T::Array[T.any(Symbol, String, Regexp)]) + initialize_retry end + sig do + params( + settings: T.nilable(SorbetTypes::SettingsHash), + input: SorbetTypes::OperationInputHash, + extended_input_schema: SorbetTypes::OperationSchema, + extended_output_schema: SorbetTypes::OperationSchema, + continue: T::Hash[T.any(Symbol, String), T.untyped], + block: T.nilable(SorbetTypes::OperationExecuteProc) + ).returns( + T.untyped + ) + end def execute(settings = nil, input = {}, extended_input_schema = [], extended_output_schema = [], continue = {}, &block) raise InvalidDefinitionError, "'execute' block is required for action" unless block || action[:execute] loop do @@ -37,11 +63,11 @@ raise "Max number of reinvokes on SDK Gem reached. Current limit is #{reinvoke_limit}" end reinvoke_sleep if @reinvoke_after - @reinvoke_after = nil + reinvoke_reset result = super( settings, input, extended_input_schema, @@ -50,19 +76,21 @@ &(block || action[:execute]) ) break result unless @reinvoke_after - continue = @reinvoke_after[:continue] + continue = @reinvoke_after.continue end rescue RequestError => e raise e unless retry?(e) @retries_left -= 1 - sleep(RETRY_DELAY) && retry + sleep(RETRY_DELAY) + retry end + sig { params(input: SorbetTypes::OperationInputHash).returns(T::Hash[T.any(String, Symbol), T.untyped]) } def invoke(input = {}) extended_schema = extended_schema(nil, input) config_schema = Schema.new(schema: config_fields_schema) input_schema = Schema.new(schema: extended_schema[:input]) output_schema = Schema.new(schema: extended_schema[:output]) @@ -70,65 +98,95 @@ input = apply_input_schema(input, config_schema + input_schema) output = execute(nil, input, input_schema, output_schema) apply_output_schema(output, output_schema) end + sig do + params( + continue: T::Hash[T.untyped, T.untyped], + temp_output: T.nilable(T::Hash[T.untyped, T.untyped]) + ).void + end def checkpoint!(continue:, temp_output: nil) # no-op end - def reinvoke_after(seconds:, continue:, temp_output: nil) + sig do + params( + seconds: Integer, + continue: T::Hash[T.untyped, T.untyped], + temp_output: T.nilable(T::Hash[T.untyped, T.untyped]) + ).void + end + def reinvoke_after(seconds:, continue:, temp_output: nil) # rubocop:disable Lint/UnusedMethodArgument + @reinvokes_remaining = T.let(@reinvokes_remaining, T.nilable(Integer)) @reinvokes_remaining = (@reinvokes_remaining ? @reinvokes_remaining - 1 : reinvoke_limit) - @reinvoke_after = { + @reinvoke_after = ReinvokeAfter.new( seconds: seconds, - continue: continue, - temp_output: temp_output - } + continue: continue + ) end private + sig { returns(T::Array[T.any(Symbol, String, Regexp, Integer)]) } def retry_on_response Array(action[:retry_on_response]) end + sig { returns(T::Array[T.any(Symbol, String, Regexp, Integer)]) } def retry_on_request Array(action[:retry_on_request]) end + sig { returns(T.nilable(Integer)) } def max_retries action[:max_retries] end + sig { void } def initialize_retry - @retries_left = 0 return if retry_on_response.blank? - @retry_codes = [] - @retry_matchers = [] retry_on_response.each { |m| m.is_a?(::Integer) ? @retry_codes << m : @retry_matchers << m } @retry_codes = RETRY_DEFAULT_CODES if @retry_codes.empty? @retry_methods = (retry_on_request.presence || RETRY_DEFAULT_METHODS).map(&:to_s).map(&:downcase) @retries_left = [[max_retries.is_a?(::Integer) && max_retries || MAX_RETRIES, MAX_RETRIES].min, 0].max end + sig { params(exception: RequestError).returns(T::Boolean) } def retry?(exception) - return unless @retries_left.positive? - return unless @retry_codes.include?(exception.code.to_i) - return unless @retry_matchers.empty? || @retry_matchers.any? do |m| + return false unless @retries_left.positive? + return false unless @retry_codes.include?(exception.code.to_i) + return false unless @retry_matchers.empty? || @retry_matchers.any? do |m| m === exception.message || m === exception.response end @retry_methods.include?(exception.method.to_s.downcase) end + sig { void } def reinvoke_sleep - sleep((ENV['WAIT_REINVOKE_AFTER'].presence || @reinvoke_after[:seconds]).to_f) + sleep((ENV['WAIT_REINVOKE_AFTER'].presence || T.must(@reinvoke_after).seconds).to_f) end + sig { returns(Integer) } def reinvoke_limit + @reinvoke_limit = T.let(@reinvoke_limit, T.nilable(Integer)) @reinvoke_limit ||= (ENV['MAX_REINVOKES'].presence || MAX_REINVOKES).to_i end + + sig { void } + def reinvoke_reset + @reinvoke_after = T.let(nil, T.nilable(ReinvokeAfter)) + end + + class ReinvokeAfter < T::Struct + prop :seconds, T.any(Float, Integer) + prop :continue, T::Hash[T.untyped, T.untyped] + end + + private_constant :ReinvokeAfter alias action operation end end end