lib/alephant/publisher.rb in alephant-publisher-0.1.7 vs lib/alephant/publisher.rb in alephant-publisher-0.1.8

- old
+ new

@@ -1,17 +1,10 @@ require_relative 'env' -require 'java' - -java_import 'java.util.concurrent.ThreadPoolExecutor' -java_import 'java.util.concurrent.TimeUnit' -java_import 'java.util.concurrent.LinkedBlockingQueue' -java_import 'java.util.concurrent.FutureTask' - require 'alephant/publisher/version' require 'alephant/publisher/queue' -require 'alephant/publisher/publish_task' +require 'alephant/publisher/writer' require 'alephant/logger' module Alephant module Publisher include ::Alephant::Logger @@ -19,17 +12,12 @@ def self.create(opts = {}, logger = nil) Publisher.new(opts, logger) end class Publisher - VISIBILITY_TIMEOUT = 60 - KEEP_ALIVE_TIMEOUT = 60 RECEIVE_WAIT_TIME = 15 - POOL_MIN_SIZE = 2 - POOL_MAX_SIZE = 4 - QUEUE_THROTTLE = 0.5 attr_reader :queue, :executor def initialize(opts, logger) ::Alephant::Logger.set_logger(logger) unless logger.nil? @@ -51,34 +39,23 @@ :s3_object_path, :view_path, :lookup_table_name ].include? k end - - @executor = ThreadPoolExecutor.new( - @opts.fetch(:renderer_pool_min_size, POOL_MIN_SIZE).to_i, - @opts.fetch(:renderer_pool_max_size, POOL_MAX_SIZE).to_i, - @opts.fetch(:render_keep_alive_time, KEEP_ALIVE_TIMEOUT).to_i, - TimeUnit::SECONDS, - LinkedBlockingQueue.new - ) end def run! - while true - executor.execute( - FutureTask.new( - PublishTask.new( - @writer_opts, - @queue.message - ) - ) - ) + loop { process(@queue.message) } + end - sleep QUEUE_THROTTLE while executor.getActiveCount == executor.getMaximumPoolSize - end + private - executor.shutdown() + def process(msg) + unless msg.nil? + Writer.new(@writer_opts, msg).run! + msg.delete + end end + end end end