Sha256: 9e346c635c88e01fd06e11e4383cf92e4490a518cad9339dd1fa3c13b8d34b46

Contents?: true

Size: 1.81 KB

Versions: 1

Compression:

Stored size: 1.81 KB

Contents

require "aws-sdk"
require "alephant/logger"

module Alephant
  module Publisher
    module Queue
      module SQSHelper
        class Queue
          WAIT_TIME = 5
          VISABILITY_TIMEOUT = 300

          include Logger

          attr_reader :queue, :timeout, :wait_time, :archiver

          def initialize(queue, archiver = nil, timeout = VISABILITY_TIMEOUT, wait_time = WAIT_TIME)
            @queue     = queue
            @archiver  = archiver
            @timeout   = timeout
            @wait_time = wait_time

            logger.info(
              "event"    => "QueueConfigured",
              "queueUrl" => queue.url,
              "archiver" => archiver,
              "timeout"  => timeout,
              "method"   => "#{self.class}#initialize"
            )
          end

          def message
            receive.tap { |m| process(m) unless m.nil? }
          end

          private

          def process(m)
            logger.metric "MessagesReceived"
            logger.info(
              "event"     => "QueueMessageReceived",
              "messageId" => m.id,
              "method"    => "#{self.class}#process"
            )
            archive m
          end

          def archive(m)
            archiver.see(m) unless archiver.nil?
          rescue StandardError => e
            logger.metric "ArchiveFailed"
            logger.error(
              "event"     => "MessageArchiveFailed",
              "class"     => e.class,
              "message"   => e.message,
              "backtrace" => e.backtrace.join.to_s,
              "method"    => "#{self.class}#archive"
            )
          end

          def receive
            queue.receive_message({
              :visibility_timeout => timeout,
              :wait_time_seconds  => wait_time
            })
          end
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
alephant-publisher-queue-2.0.0 lib/alephant/publisher/queue/sqs_helper/queue.rb