Sha256: 9efd1fb0a1b534671ac783ed37c4b559d9626bacaf4e5530a91f9117911fa711
Contents?: true
Size: 2 KB
Versions: 1
Compression:
Stored size: 2 KB
Contents
require_relative 'env' require 'java' # setup executor java_import 'java.util.concurrent.ThreadPoolExecutor' java_import 'java.util.concurrent.TimeUnit' java_import 'java.util.concurrent.LinkedBlockingQueue' java_import 'java.util.concurrent.FutureTask' java_import 'java.util.concurrent.Callable' require 'alephant/logger' require "alephant/publisher/version" require 'alephant/publisher/models/writer' require 'alephant/publisher/models/queue' module Alephant module Publisher include ::Alephant::Logger def self.create(opts = {}, logger = nil) Publisher.new(opts, logger) end private class Publisher attr_reader :queue def initialize(opts, logger) ::Alephant::Logger.set_logger(logger) unless logger.nil? @opts = opts @queue = Queue.new( opts[:sqs_queue_url] ) end def run! core_pool_size = @opts[:renderer_pool_min_size] || 2 maximum_pool_size = @opts[:renderer_pool_max_size] || 4 keep_alive_time = @opts[:render_keep_alive_time] || 300 executor = ThreadPoolExecutor.new( core_pool_size, maximum_pool_size, keep_alive_time, TimeUnit::SECONDS, LinkedBlockingQueue.new ) @queue.poll do |msg| task = FutureTask.new(PublishTask.new(@opts, msg)) executor.execute(task) end executor.shutdown() end end class PublishTask include Callable attr_reader :writer, :msg def initialize(opts,msg) @msg = msg @writer = Writer.new( opts.select do |k,v| [ :msg_vary_id_path, :sequencer_table_name, :sequence_id_path, :renderer_id, :s3_bucket_id, :s3_object_path, :view_path, :lookup_table_name ].include? k end ) end def call writer.write(msg) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
alephant-publisher-0.1.3 | lib/alephant/publisher.rb |