lib/alephant/publisher/queue.rb in alephant-publisher-queue-2.3.1 vs lib/alephant/publisher/queue.rb in alephant-publisher-queue-2.4.0

- old
+ new

@@ -1,105 +1,23 @@ require_relative "env" require "alephant/publisher/queue/version" require "alephant/publisher/queue/options" +require "alephant/publisher/queue/publisher" require "alephant/publisher/queue/sqs_helper/queue" require "alephant/publisher/queue/sqs_helper/archiver" require "alephant/publisher/queue/processor" +require "alephant/publisher/queue/revalidate_processor" require "alephant/logger" require "alephant/cache" require "json" module Alephant module Publisher module Queue - def self.create(opts = {}, processor = nil) - processor ||= Processor.new(opts.writer) + def self.create(opts, processor = nil) + processor ||= Processor.new(opts) Publisher.new(opts, processor) - end - - class Publisher - include Alephant::Logger - - VISIBILITY_TIMEOUT = 60 - RECEIVE_WAIT_TIME = 15 - - attr_reader :queue, :executor, :opts, :processor - - def initialize(opts, processor = nil) - @opts = opts - @processor = processor - - @queue = Alephant::Publisher::Queue::SQSHelper::Queue.new( - aws_queue, - archiver, - opts.queue[:visibility_timeout] || VISIBILITY_TIMEOUT, - opts.queue[:receive_wait_time] || RECEIVE_WAIT_TIME - ) - end - - def run! - loop { processor.consume(@queue.message) } - end - - private - - def archiver - Alephant::Publisher::Queue::SQSHelper::Archiver.new(archive_cache, archiver_opts) - end - - def archiver_opts - options = { - :async_store => true, - :log_archive_message => true, - :log_validator => opts.queue[:log_validator] - } - options.each do |key, _value| - options[key] = opts.queue[key] == "true" if whitelist_key(opts.queue, key) - end - end - - def whitelist_key(options, key) - options.key?(key) && key != :log_validator - end - - def archive_cache - Alephant::Cache.new( - opts.writer[:s3_bucket_id], - opts.writer[:s3_object_path] - ) - end - - def get_region - opts.queue[:sqs_account_region] || AWS.config.region - end - - def sqs_client - @sqs_client ||= AWS::SQS.new(:region => get_region) - end - - def sqs_queue_options - (opts.queue[:aws_account_id].nil? ? {} : fallback).tap do |ops| - logger.info( - "event" => "SQSQueueOptionsConfigured", - "options" => ops, - "method" => "#{self.class}#sqs_queue_options" - ) - end - end - - def fallback - { - :queue_owner_aws_account_id => opts.queue[:aws_account_id] - } - end - - def aws_queue - queue_url = sqs_client.queues.url_for( - opts.queue[:sqs_queue_name], sqs_queue_options - ) - sqs_client.queues[queue_url] - end end end end end