lib/alephant/publisher.rb in alephant-publisher-0.1.3 vs lib/alephant/publisher.rb in alephant-publisher-0.1.4
- old
+ new
@@ -1,92 +1,81 @@
require_relative 'env'
require 'java'
-# setup executor
java_import 'java.util.concurrent.ThreadPoolExecutor'
java_import 'java.util.concurrent.TimeUnit'
java_import 'java.util.concurrent.LinkedBlockingQueue'
java_import 'java.util.concurrent.FutureTask'
-java_import 'java.util.concurrent.Callable'
+require 'alephant/publisher/version'
+require 'alephant/publisher/queue'
+require 'alephant/publisher/publish_task'
require 'alephant/logger'
-require "alephant/publisher/version"
-require 'alephant/publisher/models/writer'
-require 'alephant/publisher/models/queue'
-
module Alephant
module Publisher
include ::Alephant::Logger
def self.create(opts = {}, logger = nil)
Publisher.new(opts, logger)
end
- private
-
class Publisher
- attr_reader :queue
+ VISIBILITY_TIMEOUT = 300
+ KEEP_ALIVE_TIMEOUT = 300
+ RECEIVE_WAIT_TIME = 15
+ POOL_MIN_SIZE = 2
+ POOL_MAX_SIZE = 4
+
+ attr_reader :queue, :executor
+
def initialize(opts, logger)
::Alephant::Logger.set_logger(logger) unless logger.nil?
@opts = opts
@queue = Queue.new(
- opts[:sqs_queue_url]
+ opts[:sqs_queue_url],
+ opts[:visibility_timeout] || VISIBILITY_TIMEOUT,
+ opts[:receive_wait_time] || RECEIVE_WAIT_TIME,
)
- end
- def run!
- core_pool_size = @opts[:renderer_pool_min_size] || 2
- maximum_pool_size = @opts[:renderer_pool_max_size] || 4
- keep_alive_time = @opts[:render_keep_alive_time] || 300
+ @writer_opts = opts.select do |k,v|
+ [
+ :msg_vary_id_path,
+ :sequencer_table_name,
+ :sequence_id_path,
+ :renderer_id,
+ :s3_bucket_id,
+ :s3_object_path,
+ :view_path,
+ :lookup_table_name
+ ].include? k
+ end
- executor = ThreadPoolExecutor.new(
- core_pool_size,
- maximum_pool_size,
- keep_alive_time,
+ @executor = ThreadPoolExecutor.new(
+ @opts[:renderer_pool_min_size] || POOL_MIN_SIZE,
+ @opts[:renderer_pool_max_size] || POOL_MAX_SIZE,
+ @opts[:render_keep_alive_time] || KEEP_ALIVE_TIMEOUT,
TimeUnit::SECONDS,
LinkedBlockingQueue.new
)
+ end
- @queue.poll do |msg|
- task = FutureTask.new(PublishTask.new(@opts, msg))
- executor.execute(task)
+ def run!
+ while true
+ executor.execute(
+ FutureTask.new(
+ PublishTask.new(
+ @writer_opts,
+ @queue.message
+ )
+ )
+ )
end
executor.shutdown()
end
-
- end
-
- class PublishTask
- include Callable
-
- attr_reader :writer, :msg
-
- def initialize(opts,msg)
- @msg = msg
- @writer = Writer.new(
- opts.select do |k,v|
- [
- :msg_vary_id_path,
- :sequencer_table_name,
- :sequence_id_path,
- :renderer_id,
- :s3_bucket_id,
- :s3_object_path,
- :view_path,
- :lookup_table_name
- ].include? k
- end
- )
- end
-
- def call
- writer.write(msg)
- end
-
end
end
end