Sha256: 49b1f6dd0e1ebb34d83e52ab79c29bff795ca2abdc3cea4af50b9836df4a80c7
Contents?: true
Size: 1.77 KB
Versions: 17
Compression:
Stored size: 1.77 KB
Contents
module Shoryuken module Middleware module Server class AutoExtendVisibility include Util EXTEND_UPFRONT_SECONDS = 5 def call(worker, queue, sqs_msg, body) return yield unless worker.class.auto_visibility_timeout? if sqs_msg.is_a?(Array) logger.warn { "Auto extend visibility isn't supported for batch workers" } return yield end timer = auto_visibility_timer(worker, queue, sqs_msg, body) yield ensure timer.kill if timer end private class MessageVisibilityExtender include Util def auto_extend(worker, queue, sqs_msg, body) queue_visibility_timeout = Shoryuken::Client.queues(queue).visibility_timeout Concurrent::TimerTask.new(execution_interval: queue_visibility_timeout - EXTEND_UPFRONT_SECONDS) do begin logger.debug do "Extending message #{worker_name(worker.class, sqs_msg, body)}/#{queue}/#{sqs_msg.message_id} " \ "visibility timeout by #{queue_visibility_timeout}s." end sqs_msg.change_visibility(visibility_timeout: queue_visibility_timeout) rescue => ex logger.error do 'Could not auto extend the message ' \ "#{worker_name(worker.class, sqs_msg, body)}/#{queue}/#{sqs_msg.message_id} " \ "visibility timeout. Error: #{ex.message}" end end end end end def auto_visibility_timer(worker, queue, sqs_msg, body) MessageVisibilityExtender.new.auto_extend(worker, queue, sqs_msg, body).tap(&:execute) end end end end end
Version data entries
17 entries across 17 versions & 1 rubygems