Sha256: 60fce2795fcfd7b2a101c1009d4b99e2b1ffdf5d3547bdf652db5ec3500dc777
Contents?: true
Size: 1.7 KB
Versions: 2
Compression:
Stored size: 1.7 KB
Contents
require 'aws-sdk' require 'alephant/logger' module Alephant module Publisher module Queue module SQSHelper class Queue WAIT_TIME = 5 VISABILITY_TIMEOUT = 300 include Logger attr_reader :queue, :timeout, :wait_time, :archiver def initialize(queue, archiver = nil, timeout = VISABILITY_TIMEOUT, wait_time = WAIT_TIME) @queue = queue @archiver = archiver @timeout = timeout @wait_time = wait_time logger.debug("Queue#initialize: reading from #{queue.url}") end def message receive.tap { |m| process(m) unless m.nil? } end private def process(m) logger.metric( "MessagesReceived", opts[:dimensions].merge(:function => "process") ) logger.info("Queue#message: received #{m.id}") archive m end def archive(m) archiver.see(m) unless archiver.nil? rescue StandardError => e logger.metric( "ArchiveFailed", opts[:dimensions].merge(:function => "archive") ) logger.warn("Queue#archive: archive failed (#{e.message})"); end def receive queue.receive_message({ :visibility_timeout => timeout, :wait_time_seconds => wait_time }) end def opts { :dimensions => { :module => "PublisherQueueSQSHelper", :class => "Queue" } } end end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
alephant-publisher-queue-1.3.5 | lib/alephant/publisher/queue/sqs_helper/queue.rb |
alephant-publisher-queue-1.3.4 | lib/alephant/publisher/queue/sqs_helper/queue.rb |