lib/alephant/publisher.rb in alephant-publisher-0.1.7 vs lib/alephant/publisher.rb in alephant-publisher-0.1.8
- old
+ new
@@ -1,17 +1,10 @@
require_relative 'env'
-require 'java'
-
-java_import 'java.util.concurrent.ThreadPoolExecutor'
-java_import 'java.util.concurrent.TimeUnit'
-java_import 'java.util.concurrent.LinkedBlockingQueue'
-java_import 'java.util.concurrent.FutureTask'
-
require 'alephant/publisher/version'
require 'alephant/publisher/queue'
-require 'alephant/publisher/publish_task'
+require 'alephant/publisher/writer'
require 'alephant/logger'
module Alephant
module Publisher
include ::Alephant::Logger
@@ -19,17 +12,12 @@
def self.create(opts = {}, logger = nil)
Publisher.new(opts, logger)
end
class Publisher
-
VISIBILITY_TIMEOUT = 60
- KEEP_ALIVE_TIMEOUT = 60
RECEIVE_WAIT_TIME = 15
- POOL_MIN_SIZE = 2
- POOL_MAX_SIZE = 4
- QUEUE_THROTTLE = 0.5
attr_reader :queue, :executor
def initialize(opts, logger)
::Alephant::Logger.set_logger(logger) unless logger.nil?
@@ -51,34 +39,23 @@
:s3_object_path,
:view_path,
:lookup_table_name
].include? k
end
-
- @executor = ThreadPoolExecutor.new(
- @opts.fetch(:renderer_pool_min_size, POOL_MIN_SIZE).to_i,
- @opts.fetch(:renderer_pool_max_size, POOL_MAX_SIZE).to_i,
- @opts.fetch(:render_keep_alive_time, KEEP_ALIVE_TIMEOUT).to_i,
- TimeUnit::SECONDS,
- LinkedBlockingQueue.new
- )
end
def run!
- while true
- executor.execute(
- FutureTask.new(
- PublishTask.new(
- @writer_opts,
- @queue.message
- )
- )
- )
+ loop { process(@queue.message) }
+ end
- sleep QUEUE_THROTTLE while executor.getActiveCount == executor.getMaximumPoolSize
- end
+ private
- executor.shutdown()
+ def process(msg)
+ unless msg.nil?
+ Writer.new(@writer_opts, msg).run!
+ msg.delete
+ end
end
+
end
end
end