lib/alephant/publisher/queue.rb in alephant-publisher-queue-2.2.0 vs lib/alephant/publisher/queue.rb in alephant-publisher-queue-2.3.0
- old
+ new
@@ -2,12 +2,13 @@
require "alephant/publisher/queue/version"
require "alephant/publisher/queue/options"
require "alephant/publisher/queue/sqs_helper/queue"
require "alephant/publisher/queue/sqs_helper/archiver"
-require "alephant/logger"
require "alephant/publisher/queue/processor"
+require "alephant/logger"
+require "alephant/cache"
require "json"
module Alephant
module Publisher
module Queue
@@ -15,22 +16,22 @@
processor ||= Processor.new(opts.writer)
Publisher.new(opts, processor)
end
class Publisher
- include Logger
+ include Alephant::Logger
VISIBILITY_TIMEOUT = 60
RECEIVE_WAIT_TIME = 15
attr_reader :queue, :executor, :opts, :processor
def initialize(opts, processor = nil)
@opts = opts
@processor = processor
- @queue = SQSHelper::Queue.new(
+ @queue = Alephant::Publisher::Queue::SQSHelper::Queue.new(
aws_queue,
archiver,
opts.queue[:visibility_timeout] || VISIBILITY_TIMEOUT,
opts.queue[:receive_wait_time] || RECEIVE_WAIT_TIME
)
@@ -41,11 +42,11 @@
end
private
def archiver
- SQSHelper::Archiver.new(archive_cache, archiver_opts)
+ Alephant::Publisher::Queue::SQSHelper::Archiver.new(archive_cache, archiver_opts)
end
def archiver_opts
options = {
:async_store => true,
@@ -60,17 +61,21 @@
def whitelist_key(options, key)
options.key?(key) && key != :log_validator
end
def archive_cache
- Cache.new(
+ Alephant::Cache.new(
opts.writer[:s3_bucket_id],
opts.writer[:s3_object_path]
)
end
+ def get_region
+ opts.queue[:sqs_account_region] || AWS.config.region
+ end
+
def sqs_client
- @sqs_client ||= AWS::SQS.new
+ @sqs_client ||= AWS::SQS.new(:region => get_region)
end
def sqs_queue_options
(opts.queue[:aws_account_id].nil? ? {} : fallback).tap do |ops|
logger.info(