lib/qpush/server/manager.rb in qpush-0.1.4 vs lib/qpush/server/manager.rb in qpush-0.1.6
- old
+ new
@@ -4,15 +4,15 @@
# of them to start and shutdown.
#
class Manager
include ObjectValidator::Validate
- attr_accessor :workers, :options
+ attr_accessor :configs
attr_reader :forks
- def initialize(options = {})
- options.each { |key, value| send("#{key}=", value) }
+ def initialize(configs)
+ @configs = configs
@master = Process.pid
@forks = []
at_exit { shutdown }
end
@@ -21,58 +21,68 @@
# sleep so that our Workers can do their thing.
#
def start
validate!
start_messages
+ flush_spaces
create_workers
Process.wait
end
# Shutsdown our Worker processes.
#
def shutdown
unless @forks.empty?
- @forks.each { |w| Process.kill('SIGTERM', w[:pid].to_i) }
+ @forks.each { |w| Process.kill('QUIT', w[:pid].to_i) }
end
+ Process.waitall
Process.kill('SIGTERM', @master)
end
private
# Create the specified number of workers and starts them
#
def create_workers
- @workers.times do |id|
- pid = fork { Worker.new(@options.merge(id: id)).start }
+ @configs.each_with_index do |config, id|
+ pid = fork { Worker.new(id, config).start }
@forks << { id: id, pid: pid }
end
end
# Information about the start process
#
def start_messages
- Server.log.info("* Workers: #{@workers}")
- Server.log.info("* Threads: #{@options[:queue_threads]} queue, #{@options[:perform_threads]} perform, #{@options[:delay_threads]} delay")
+ Server.log.info("* Worker count: #{@configs.count}")
end
# Validates our data before starting our Workers. Also instantiates our
# connection pool by pinging Redis.
#
def validate!
return if valid?
fail ServerError, errors.full_messages.join(' ')
end
+
+ # Removes the list of namespaces used by our server from Redis. This
+ # prepares it for the new list that will be created by our workers.
+ #
+ def flush_spaces
+ Server.redis { |c| c.del(QPush::Base::KEY + ':namespaces') }
+ end
end
# The ManagerValidator ensures the data for our manager is valid before
# attempting to start it.
#
class ManagerValidator
include ObjectValidator::Validator
- validates :redis, with: { proc: proc { QPush.redis.with { |c| c.ping && c.quit } },
+ validates :redis, with: { proc: proc { Server.redis { |c| c.ping && c.quit } },
msg: 'could not be connected with' }
- validates :workers, type: Integer, greater_than: 0
- validates :options, type: Hash
+ validates :configs, with: { proc: proc { |m| m.configs.count > 0 },
+ msg: 'were not defined' }
+ validates :configs, with: { proc: proc { |m| m.configs.each { |c| c.is_a?(WorkerConfig) } },
+ msg: 'are not valid WorkerConfig objects' }
end
end
end