Sha256: 4aab7d976770bd06043a1a8c2f596aace10d02977bba170a30b83830bc596f1c

Contents?: true

Size: 1.86 KB

Versions: 8

Compression:

Stored size: 1.86 KB

Contents

require_relative 'env'

require 'alephant/publisher/queue/version'
require 'alephant/publisher/queue/options'
require 'alephant/publisher/queue/sqs_helper/queue'
require 'alephant/publisher/queue/sqs_helper/archiver'
require 'alephant/logger'
require 'alephant/publisher/queue/processor'
require 'json'

module Alephant
  module Publisher
    module Queue
      def self.create(opts = {}, processor = nil)
        processor ||= Processor.new(opts.writer)
        Publisher.new(opts, processor)
      end

      class Publisher
        include Logger

        VISIBILITY_TIMEOUT = 60
        RECEIVE_WAIT_TIME  = 15

        attr_reader :queue, :executor, :opts, :processor

        def initialize(opts, processor = nil)
          @opts = opts
          @processor = processor

          @queue = SQSHelper::Queue.new(
            aws_queue,
            archiver,
            opts.queue[:visibility_timeout] || VISIBILITY_TIMEOUT,
            opts.queue[:receive_wait_time]  || RECEIVE_WAIT_TIME,
          )
        end

        def run!
          loop { processor.consume(@queue.message) }
        end

        private

        def archiver
          SQSHelper::Archiver.new(archive_cache)
        end

        def archive_cache
          Cache.new(
            opts.writer[:s3_bucket_id],
            opts.writer[:s3_object_path]
          )
        end

        def sqs_client
          @sqs_client ||= AWS::SQS.new
        end

        def sqs_queue_options
          logger.info "Publisher::Queue::Publisher#sqs_queue_options: AWS Account ID '#{opts.queue[:aws_account_id]}'"
          opts.queue[:aws_account_id].nil? ? {} : { :queue_owner_aws_account_id => opts.queue[:aws_account_id] }
        end

        def aws_queue
          queue_url = sqs_client.queues.url_for(opts.queue[:sqs_queue_name], sqs_queue_options)
          sqs_client.queues[queue_url]
        end
      end
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
alephant-publisher-queue-1.3.6 lib/alephant/publisher/queue.rb
alephant-publisher-queue-1.3.5 lib/alephant/publisher/queue.rb
alephant-publisher-queue-1.3.4 lib/alephant/publisher/queue.rb
alephant-publisher-queue-1.3.3 lib/alephant/publisher/queue.rb
alephant-publisher-queue-1.3.2 lib/alephant/publisher/queue.rb
alephant-publisher-queue-1.2.2 lib/alephant/publisher/queue.rb
alephant-publisher-queue-1.2.1 lib/alephant/publisher/queue.rb
alephant-publisher-queue-1.2.0 lib/alephant/publisher/queue.rb