Sha256: 49b1f6dd0e1ebb34d83e52ab79c29bff795ca2abdc3cea4af50b9836df4a80c7

Contents?: true

Size: 1.77 KB

Versions: 17

Compression:

Stored size: 1.77 KB

Contents

module Shoryuken
  module Middleware
    module Server
      class AutoExtendVisibility
        include Util

        EXTEND_UPFRONT_SECONDS = 5

        def call(worker, queue, sqs_msg, body)
          return yield unless worker.class.auto_visibility_timeout?

          if sqs_msg.is_a?(Array)
            logger.warn { "Auto extend visibility isn't supported for batch workers" }
            return yield
          end

          timer = auto_visibility_timer(worker, queue, sqs_msg, body)
          yield
        ensure
          timer.kill if timer
        end

        private

        class MessageVisibilityExtender
          include Util

          def auto_extend(worker, queue, sqs_msg, body)
            queue_visibility_timeout = Shoryuken::Client.queues(queue).visibility_timeout

            Concurrent::TimerTask.new(execution_interval: queue_visibility_timeout - EXTEND_UPFRONT_SECONDS) do
              begin
                logger.debug do
                  "Extending message #{worker_name(worker.class, sqs_msg, body)}/#{queue}/#{sqs_msg.message_id} " \
                  "visibility timeout by #{queue_visibility_timeout}s."
                end

                sqs_msg.change_visibility(visibility_timeout: queue_visibility_timeout)
              rescue => ex
                logger.error do
                  'Could not auto extend the message ' \
                  "#{worker_name(worker.class, sqs_msg, body)}/#{queue}/#{sqs_msg.message_id} " \
                  "visibility timeout. Error: #{ex.message}"
                end
              end
            end
          end
        end

        def auto_visibility_timer(worker, queue, sqs_msg, body)
          MessageVisibilityExtender.new.auto_extend(worker, queue, sqs_msg, body).tap(&:execute)
        end
      end
    end
  end
end

Version data entries

17 entries across 17 versions & 1 rubygems

Version Path
shoryuken-3.1.12 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.1.11 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.1.10 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.1.9 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.1.8 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.1.7 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.1.6 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.1.5 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.1.4 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.1.3 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.1.2 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.1.1 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.1.0 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.0.11 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.0.10 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.0.9 lib/shoryuken/middleware/server/auto_extend_visibility.rb
shoryuken-3.0.8 lib/shoryuken/middleware/server/auto_extend_visibility.rb