lib/qpush/server/worker.rb in qpush-0.1.4 vs lib/qpush/server/worker.rb in qpush-0.1.6

- old
+ new

@@ -1,93 +1,127 @@ + module QPush module Server - # The Worker manages our workers - Queue, Delay, and Perform. Each of these - # workers is alloted a number of threads. Each worker object maintains - # control of these threads through the aptly named start and shutdown - # methods. + # The Worker manages our workers - Queue, Delay, Perform and Heartbeat. + # Each of these workers is alloted a number of threads. Each worker + # object maintains control of these threads through the aptly named start + # and shutdown methods. # class Worker + extend Forwardable include ObjectValidator::Validate - attr_accessor :perform_threads, :queue_threads, :delay_threads, :id + attr_reader :config, :pid, :id - def initialize(options = {}) - options.each { |key, value| send("#{key}=", value) } + def_delegators :@config, :perform_threads, :delay_threads, :queue_threads, + :priorities, :base_threads, :namespace + + def initialize(id, config) + @id = id @pid = Process.pid - @workers = [] + @config = config + @actions = [] @threads = [] at_exit { shutdown } end # Starts our new worker. # def start validate! + assign_globals + register_space start_message build_threads start_threads end # Shutsdown our worker as well as its threads. # def shutdown shutdown_message - @workers.each(&:shutdown) + @actions.each(&:shutdown) @threads.each(&:exit) end private - # Forks the worker and creates the actual threads (@_threads_real) for + # Forks the worker and creates the actual threads (@threads) for # our Queue and Retry objects. We then start them and join them to the # main process. # def start_threads - @workers.each do |worker| - @threads << Thread.new { worker.start } + @actions.each do |action| + @threads << Thread.new { action.start } end @threads.map(&:join) end - # Instantiates our Queue, Perform, and Delay objects based on the number - # of threads specified for each process type. We store these objects as - # an array in @threads. + # Instantiates our Queue, Perform, Delay and Heartbeat objects based on + # the number of threads specified for each process type. We store these + # objects as an array in @actions. # def build_threads - @perform_threads.times { @workers << Perform.new } - @queue_threads.times { @workers << Queue.new } - @delay_threads.times { @workers << Delay.new } - @workers << Heartbeat.new + base_threads.each do |thread| + thread[:count].times do + @actions << thread[:klass].new + end + end end + def base_threads + [ + { klass: Perform, count: perform_threads }, + { klass: Queue, count: queue_threads }, + { klass: Delay, count: delay_threads }, + { klass: Heartbeat, count: 1 } + ] + end + # Information about the start process # def start_message - Server.log.info("* Worker #{@id} started, pid: #{@pid}") + Server.log.info("* Worker #{@id} started | pid: #{@pid} | namespace: #{namespace}") end # Information about the shutdown process # def shutdown_message - Server.log.info("* Worker #{@id} shutdown, pid: #{@pid}") + Server.log.info("* Worker #{@id} shutdown | pid: #{@pid}") end # Validates our data before starting the worker. # def validate! return if valid? fail ServerError, errors.full_messages.join(' ') end + + def assign_globals + Server.keys = RedisKeys.new(@config.for_keys) + Server.worker = self + end + + # Registers our workers namespace on Redis + # + def register_space + Server.redis do |c| + c.sadd(QPush::Base::KEY + ':namespaces', namespace) + end + end end - # The WorkerValidator ensures the data for our worker is valid before - # attempting to start it. + # The WorkerValidator ensures the data for our worker is + # valid before attempting to use it. # class WorkerValidator include ObjectValidator::Validator + validates :config, type: QPush::Server::WorkerConfig validates :perform_threads, type: Integer, greater_than: 0 validates :queue_threads, type: Integer, greater_than: 0 validates :delay_threads, type: Integer, greater_than: 0 + validates :namespace, type: String + validates :priorities, type: Integer, greater_than: 4 end end end