Sha256: 3e01faef012e7bcdc8f66fce951a34ff42e6036cff57d03412fe9c91840c809a

Contents?: true

Size: 1.83 KB

Versions: 7

Compression:

Stored size: 1.83 KB

Contents

module Shoryuken
  module Middleware
    module Server
      class ExponentialBackoffRetry
        include Util

        def call(worker, queue, sqs_msg, body)
          if sqs_msg.is_a?(Array)
            logger.warn { "Exponential backoff isn't supported for batch workers" }
            return yield
          end

          started_at = Time.now
          yield
        rescue
          retry_intervals = worker.class.get_shoryuken_options['retry_intervals']

          if retry_intervals.nil? || !handle_failure(sqs_msg, started_at, retry_intervals)
            # Re-raise the exception if the job is not going to be exponential backoff retried.
            # This allows custom middleware (like exception notifiers) to be aware of the unhandled failure.
            raise
          end
        end

        private

        def get_interval(retry_intervals, attempts)
          return retry_intervals.call(attempts) if retry_intervals.respond_to?(:call)

          if attempts <= (retry_intervals = Array(retry_intervals)).size
            retry_intervals[attempts - 1]
          else
            retry_intervals.last
          end
        end

        def next_visibility_timeout(interval, started_at)
          max_timeout = 43_200 - (Time.now - started_at).ceil - 1
          interval = max_timeout if interval > max_timeout
          interval.to_i
        end

        def handle_failure(sqs_msg, started_at, retry_intervals)
          receive_count = sqs_msg.attributes['ApproximateReceiveCount'].to_i

          return false unless (interval = get_interval(retry_intervals, receive_count))

          sqs_msg.change_visibility(visibility_timeout: next_visibility_timeout(interval.to_i, started_at))

          logger.info { "Message #{sqs_msg.message_id} failed, will be retried in #{interval} seconds." }

          true
        end
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
shoryuken-3.0.6 lib/shoryuken/middleware/server/exponential_backoff_retry.rb
shoryuken-3.0.5 lib/shoryuken/middleware/server/exponential_backoff_retry.rb
shoryuken-3.0.4 lib/shoryuken/middleware/server/exponential_backoff_retry.rb
shoryuken-3.0.3 lib/shoryuken/middleware/server/exponential_backoff_retry.rb
shoryuken-3.0.2 lib/shoryuken/middleware/server/exponential_backoff_retry.rb
shoryuken-3.0.1 lib/shoryuken/middleware/server/exponential_backoff_retry.rb
shoryuken-3.0.0 lib/shoryuken/middleware/server/exponential_backoff_retry.rb