lib/alephant/publisher.rb in alephant-publisher-0.4.0 vs lib/alephant/publisher.rb in alephant-publisher-0.5.0

- old
+ new

@@ -2,42 +2,44 @@ require 'alephant/publisher/version' require 'alephant/publisher/options' require 'alephant/publisher/sqs_helper/queue' require 'alephant/publisher/sqs_helper/archiver' -require 'alephant/publisher/writer' require 'alephant/logger' require 'alephant/support/aop' +require 'alephant/publisher/processor' module Alephant module Publisher include Logger extend Alephant::Support::AOP - def self.create(opts = {}) - Publisher.new(opts) + def self.create(opts = {}, processor = nil) + processor ||= Processor.new(opts.writer) + Publisher.new(opts, processor) end class Publisher VISIBILITY_TIMEOUT = 60 RECEIVE_WAIT_TIME = 15 - attr_reader :queue, :executor, :opts + attr_reader :queue, :executor, :opts, :processor - def initialize(opts) + def initialize(opts, processor = nil) @opts = opts + @processor = processor @queue = SQSHelper::Queue.new( aws_queue, archiver, opts.queue[:visibility_timeout] || VISIBILITY_TIMEOUT, opts.queue[:receive_wait_time] || RECEIVE_WAIT_TIME, ) end def run! - loop { process(@queue.message) } + loop { processor.consume(@queue.message) } end private def archiver @@ -51,20 +53,9 @@ ) end def aws_queue AWS::SQS.new.queues[opts.queue[:sqs_queue_url]] - end - - def write(msg) - Writer.new(opts.writer, msg).run! - end - - def process(msg) - unless msg.nil? - write msg - msg.delete - end end end end end