Sha256: 60fce2795fcfd7b2a101c1009d4b99e2b1ffdf5d3547bdf652db5ec3500dc777

Contents?: true

Size: 1.7 KB

Versions: 2

Compression:

Stored size: 1.7 KB

Contents

require 'aws-sdk'
require 'alephant/logger'

module Alephant
  module Publisher
    module Queue
      module SQSHelper
        class Queue
          WAIT_TIME = 5
          VISABILITY_TIMEOUT = 300

          include Logger

          attr_reader :queue, :timeout, :wait_time, :archiver

          def initialize(queue, archiver = nil, timeout = VISABILITY_TIMEOUT, wait_time = WAIT_TIME)
            @queue     = queue
            @archiver  = archiver
            @timeout   = timeout
            @wait_time = wait_time

            logger.debug("Queue#initialize: reading from #{queue.url}")
          end

          def message
            receive.tap { |m| process(m) unless m.nil? }
          end

          private

          def process(m)
            logger.metric(
              "MessagesReceived",
              opts[:dimensions].merge(:function => "process")
            )
            logger.info("Queue#message: received #{m.id}")
            archive m
          end

          def archive(m)
            archiver.see(m) unless archiver.nil?
          rescue StandardError => e
            logger.metric(
              "ArchiveFailed",
              opts[:dimensions].merge(:function => "archive")
            )
            logger.warn("Queue#archive: archive failed (#{e.message})");
          end

          def receive
            queue.receive_message({
              :visibility_timeout => timeout,
              :wait_time_seconds  => wait_time
            })
          end

          def opts
            {
              :dimensions => {
                :module   => "PublisherQueueSQSHelper",
                :class    => "Queue"
              }
            }
          end
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
alephant-publisher-queue-1.3.5 lib/alephant/publisher/queue/sqs_helper/queue.rb
alephant-publisher-queue-1.3.4 lib/alephant/publisher/queue/sqs_helper/queue.rb