lib/alephant/publisher.rb in alephant-publisher-0.4.0 vs lib/alephant/publisher.rb in alephant-publisher-0.5.0
- old
+ new
@@ -2,42 +2,44 @@
require 'alephant/publisher/version'
require 'alephant/publisher/options'
require 'alephant/publisher/sqs_helper/queue'
require 'alephant/publisher/sqs_helper/archiver'
-require 'alephant/publisher/writer'
require 'alephant/logger'
require 'alephant/support/aop'
+require 'alephant/publisher/processor'
module Alephant
module Publisher
include Logger
extend Alephant::Support::AOP
- def self.create(opts = {})
- Publisher.new(opts)
+ def self.create(opts = {}, processor = nil)
+ processor ||= Processor.new(opts.writer)
+ Publisher.new(opts, processor)
end
class Publisher
VISIBILITY_TIMEOUT = 60
RECEIVE_WAIT_TIME = 15
- attr_reader :queue, :executor, :opts
+ attr_reader :queue, :executor, :opts, :processor
- def initialize(opts)
+ def initialize(opts, processor = nil)
@opts = opts
+ @processor = processor
@queue = SQSHelper::Queue.new(
aws_queue,
archiver,
opts.queue[:visibility_timeout] || VISIBILITY_TIMEOUT,
opts.queue[:receive_wait_time] || RECEIVE_WAIT_TIME,
)
end
def run!
- loop { process(@queue.message) }
+ loop { processor.consume(@queue.message) }
end
private
def archiver
@@ -51,20 +53,9 @@
)
end
def aws_queue
AWS::SQS.new.queues[opts.queue[:sqs_queue_url]]
- end
-
- def write(msg)
- Writer.new(opts.writer, msg).run!
- end
-
- def process(msg)
- unless msg.nil?
- write msg
- msg.delete
- end
end
end
end
end