lib/alephant/publisher.rb in alephant-publisher-0.1.3 vs lib/alephant/publisher.rb in alephant-publisher-0.1.4

- old
+ new

@@ -1,92 +1,81 @@ require_relative 'env' require 'java' -# setup executor java_import 'java.util.concurrent.ThreadPoolExecutor' java_import 'java.util.concurrent.TimeUnit' java_import 'java.util.concurrent.LinkedBlockingQueue' java_import 'java.util.concurrent.FutureTask' -java_import 'java.util.concurrent.Callable' +require 'alephant/publisher/version' +require 'alephant/publisher/queue' +require 'alephant/publisher/publish_task' require 'alephant/logger' -require "alephant/publisher/version" -require 'alephant/publisher/models/writer' -require 'alephant/publisher/models/queue' - module Alephant module Publisher include ::Alephant::Logger def self.create(opts = {}, logger = nil) Publisher.new(opts, logger) end - private - class Publisher - attr_reader :queue + VISIBILITY_TIMEOUT = 300 + KEEP_ALIVE_TIMEOUT = 300 + RECEIVE_WAIT_TIME = 15 + POOL_MIN_SIZE = 2 + POOL_MAX_SIZE = 4 + + attr_reader :queue, :executor + def initialize(opts, logger) ::Alephant::Logger.set_logger(logger) unless logger.nil? @opts = opts @queue = Queue.new( - opts[:sqs_queue_url] + opts[:sqs_queue_url], + opts[:visibility_timeout] || VISIBILITY_TIMEOUT, + opts[:receive_wait_time] || RECEIVE_WAIT_TIME, ) - end - def run! - core_pool_size = @opts[:renderer_pool_min_size] || 2 - maximum_pool_size = @opts[:renderer_pool_max_size] || 4 - keep_alive_time = @opts[:render_keep_alive_time] || 300 + @writer_opts = opts.select do |k,v| + [ + :msg_vary_id_path, + :sequencer_table_name, + :sequence_id_path, + :renderer_id, + :s3_bucket_id, + :s3_object_path, + :view_path, + :lookup_table_name + ].include? k + end - executor = ThreadPoolExecutor.new( - core_pool_size, - maximum_pool_size, - keep_alive_time, + @executor = ThreadPoolExecutor.new( + @opts[:renderer_pool_min_size] || POOL_MIN_SIZE, + @opts[:renderer_pool_max_size] || POOL_MAX_SIZE, + @opts[:render_keep_alive_time] || KEEP_ALIVE_TIMEOUT, TimeUnit::SECONDS, LinkedBlockingQueue.new ) + end - @queue.poll do |msg| - task = FutureTask.new(PublishTask.new(@opts, msg)) - executor.execute(task) + def run! + while true + executor.execute( + FutureTask.new( + PublishTask.new( + @writer_opts, + @queue.message + ) + ) + ) end executor.shutdown() end - - end - - class PublishTask - include Callable - - attr_reader :writer, :msg - - def initialize(opts,msg) - @msg = msg - @writer = Writer.new( - opts.select do |k,v| - [ - :msg_vary_id_path, - :sequencer_table_name, - :sequence_id_path, - :renderer_id, - :s3_bucket_id, - :s3_object_path, - :view_path, - :lookup_table_name - ].include? k - end - ) - end - - def call - writer.write(msg) - end - end end end