Sha256: 5064089cba0f25154d10fd382b2df079dd798c203144173a39874ed6dcb07c3b

Contents?: true

Size: 1.34 KB

Versions: 3

Compression:

Stored size: 1.34 KB

Contents

require 'aws-sdk'
require 'alephant/logger'

module Alephant
  module Publisher
    module Queue
      module SQSHelper
        class Queue
          WAIT_TIME = 5
          VISABILITY_TIMEOUT = 300

          include Logger

          attr_reader :queue, :timeout, :wait_time, :archiver

          def initialize(queue, archiver = nil, timeout = VISABILITY_TIMEOUT, wait_time = WAIT_TIME)
            @queue     = queue
            @archiver  = archiver
            @timeout   = timeout
            @wait_time = wait_time

            logger.debug("Queue#initialize: reading from #{queue.url}")
          end

          def message
            receive.tap { |m| process(m) unless m.nil? }
          end

          private

          def process(m)
            logger.info("Queue#message: received #{m.id}")
            archive m
          end

          def archive(m)
            archiver.see(m) unless archiver.nil?
          rescue StandardError => e
            logger.metric(:name => "PublisherQueueSQSHelperArchiveFailed", :unit => "Count", :value => 1)
            logger.warn("Queue#archive: archive failed (#{e.message})");
          end

          def receive
            queue.receive_message({
              :visibility_timeout => timeout,
              :wait_time_seconds  => wait_time
            })
          end
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
alephant-publisher-queue-1.2.2 lib/alephant/publisher/queue/sqs_helper/queue.rb
alephant-publisher-queue-1.2.1 lib/alephant/publisher/queue/sqs_helper/queue.rb
alephant-publisher-queue-1.2.0 lib/alephant/publisher/queue/sqs_helper/queue.rb