lib/qpush/server/manager.rb in qpush-0.1.4 vs lib/qpush/server/manager.rb in qpush-0.1.6

- old
+ new

@@ -4,15 +4,15 @@ # of them to start and shutdown. # class Manager include ObjectValidator::Validate - attr_accessor :workers, :options + attr_accessor :configs attr_reader :forks - def initialize(options = {}) - options.each { |key, value| send("#{key}=", value) } + def initialize(configs) + @configs = configs @master = Process.pid @forks = [] at_exit { shutdown } end @@ -21,58 +21,68 @@ # sleep so that our Workers can do their thing. # def start validate! start_messages + flush_spaces create_workers Process.wait end # Shutsdown our Worker processes. # def shutdown unless @forks.empty? - @forks.each { |w| Process.kill('SIGTERM', w[:pid].to_i) } + @forks.each { |w| Process.kill('QUIT', w[:pid].to_i) } end + Process.waitall Process.kill('SIGTERM', @master) end private # Create the specified number of workers and starts them # def create_workers - @workers.times do |id| - pid = fork { Worker.new(@options.merge(id: id)).start } + @configs.each_with_index do |config, id| + pid = fork { Worker.new(id, config).start } @forks << { id: id, pid: pid } end end # Information about the start process # def start_messages - Server.log.info("* Workers: #{@workers}") - Server.log.info("* Threads: #{@options[:queue_threads]} queue, #{@options[:perform_threads]} perform, #{@options[:delay_threads]} delay") + Server.log.info("* Worker count: #{@configs.count}") end # Validates our data before starting our Workers. Also instantiates our # connection pool by pinging Redis. # def validate! return if valid? fail ServerError, errors.full_messages.join(' ') end + + # Removes the list of namespaces used by our server from Redis. This + # prepares it for the new list that will be created by our workers. + # + def flush_spaces + Server.redis { |c| c.del(QPush::Base::KEY + ':namespaces') } + end end # The ManagerValidator ensures the data for our manager is valid before # attempting to start it. # class ManagerValidator include ObjectValidator::Validator - validates :redis, with: { proc: proc { QPush.redis.with { |c| c.ping && c.quit } }, + validates :redis, with: { proc: proc { Server.redis { |c| c.ping && c.quit } }, msg: 'could not be connected with' } - validates :workers, type: Integer, greater_than: 0 - validates :options, type: Hash + validates :configs, with: { proc: proc { |m| m.configs.count > 0 }, + msg: 'were not defined' } + validates :configs, with: { proc: proc { |m| m.configs.each { |c| c.is_a?(WorkerConfig) } }, + msg: 'are not valid WorkerConfig objects' } end end end