Sha256: d76cd0b787ff77aff0841b253890aa948cab2975c615f7bdffdd51e06216b5ce
Contents?: true
Size: 1.74 KB
Versions: 3
Compression:
Stored size: 1.74 KB
Contents
require_relative 'env' require 'alephant/publisher/version' require 'alephant/publisher/options' require 'alephant/publisher/sqs_helper/queue' require 'alephant/publisher/sqs_helper/archiver' require 'alephant/logger' require 'alephant/support/aop' require 'alephant/publisher/processor' require 'alephant/publisher/views' require 'alephant/publisher/views/base' require 'json' module Alephant module Publisher include Logger extend Alephant::Support::AOP def self.create(opts = {}, processor = nil) processor ||= Processor.new(opts.writer) Publisher.new(opts, processor) end class Publisher VISIBILITY_TIMEOUT = 60 RECEIVE_WAIT_TIME = 15 attr_reader :queue, :executor, :opts, :processor def initialize(opts, processor = nil) @opts = opts @processor = processor @queue = SQSHelper::Queue.new( aws_queue, archiver, opts.queue[:visibility_timeout] || VISIBILITY_TIMEOUT, opts.queue[:receive_wait_time] || RECEIVE_WAIT_TIME, ) end def run! loop { processor.consume(@queue.message) } end private def archiver SQSHelper::Archiver.new(archive_cache) end def archive_cache Cache.new( opts.writer[:s3_bucket_id], opts.writer[:s3_object_path] ) end def sqs_client @sqs_client ||= AWS::SQS.new end def sqs_queue_options opts.queue[:aws_account_id].nil? ? {} : { :queue_owner_aws_account_id => opts.queue[:aws_account_id] } end def aws_queue queue_url = sqs_client.queues.url_for(opts.queue[:sqs_queue_name], sqs_queue_options) sqs_client.queues[queue_url] end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
alephant-publisher-0.6.10 | lib/alephant/publisher.rb |
alephant-publisher-0.6.9 | lib/alephant/publisher.rb |
alephant-publisher-0.6.8 | lib/alephant/publisher.rb |