Sha256: 9e346c635c88e01fd06e11e4383cf92e4490a518cad9339dd1fa3c13b8d34b46
Contents?: true
Size: 1.81 KB
Versions: 1
Compression:
Stored size: 1.81 KB
Contents
require "aws-sdk" require "alephant/logger" module Alephant module Publisher module Queue module SQSHelper class Queue WAIT_TIME = 5 VISABILITY_TIMEOUT = 300 include Logger attr_reader :queue, :timeout, :wait_time, :archiver def initialize(queue, archiver = nil, timeout = VISABILITY_TIMEOUT, wait_time = WAIT_TIME) @queue = queue @archiver = archiver @timeout = timeout @wait_time = wait_time logger.info( "event" => "QueueConfigured", "queueUrl" => queue.url, "archiver" => archiver, "timeout" => timeout, "method" => "#{self.class}#initialize" ) end def message receive.tap { |m| process(m) unless m.nil? } end private def process(m) logger.metric "MessagesReceived" logger.info( "event" => "QueueMessageReceived", "messageId" => m.id, "method" => "#{self.class}#process" ) archive m end def archive(m) archiver.see(m) unless archiver.nil? rescue StandardError => e logger.metric "ArchiveFailed" logger.error( "event" => "MessageArchiveFailed", "class" => e.class, "message" => e.message, "backtrace" => e.backtrace.join.to_s, "method" => "#{self.class}#archive" ) end def receive queue.receive_message({ :visibility_timeout => timeout, :wait_time_seconds => wait_time }) end end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
alephant-publisher-queue-2.0.0 | lib/alephant/publisher/queue/sqs_helper/queue.rb |