lib/arachni/rpc/server/dispatcher.rb in arachni-0.4.7 vs lib/arachni/rpc/server/dispatcher.rb in arachni-1.0

- old
+ new

@@ -1,35 +1,25 @@ =begin - Copyright 2010-2014 Tasos Laskos <tasos.laskos@gmail.com> + Copyright 2010-2014 Tasos Laskos <tasos.laskos@arachni-scanner.com> - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + This file is part of the Arachni Framework project and is subject to + redistribution and commercial restrictions. Please see the Arachni Framework + web site for more information on licensing and terms of use. =end -require 'socket' -require 'sys/proctable' - module Arachni -require Options.dir['lib'] + 'rpc/client' -require Options.dir['lib'] + 'rpc/server/base' -require Options.dir['lib'] + 'rpc/server/instance' -require Options.dir['lib'] + 'rpc/server/output' +lib = Options.paths.lib +require lib + 'processes/manager' +require lib + 'rpc/client' +require lib + 'rpc/server/base' +require lib + 'rpc/server/instance' +require lib + 'rpc/server/output' module RPC class Server -# # Dispatches RPC Instances on demand providing a centralized environment # for multiple clients and allows for extensive process monitoring. # # The process goes something like this: # @@ -41,86 +31,79 @@ # * The client connects to the Instance using these credentials. # # Once the client finishes using the RPC Instance he *must* shut it down # otherwise the system will be eaten away by zombie RPC Instance processes. # -# @author Tasos "Zapotek" Laskos <tasos.laskos@gmail.com> -# +# @author Tasos "Zapotek" Laskos <tasos.laskos@arachni-scanner.com> class Dispatcher - require Options.dir['lib'] + 'rpc/server/dispatcher/node' - require Options.dir['lib'] + 'rpc/server/dispatcher/handler' + require Options.paths.lib + 'rpc/server/dispatcher/node' + require Options.paths.lib + 'rpc/server/dispatcher/service' include Utilities include UI::Output - include ::Sys - HANDLER_NAMESPACE = Handler + SERVICE_NAMESPACE = Service - def initialize( opts = Options.instance ) - banner + def initialize( options = Options.instance ) + @options = options - @opts = opts + @options.dispatcher.external_address ||= @options.rpc.server_address + @options.snapshot.save_path ||= @options.paths.snapshots - @opts.rpc_port ||= 7331 - @opts.rpc_address ||= 'localhost' - @opts.rpc_external_address ||= @opts.rpc_address - @opts.pool_size ||= 5 + @server = Base.new( @options ) + @server.logger.level = @options.datastore.log_level if @options.datastore.log_level - if @opts.help - print_help - exit 0 - end - - @server = Base.new( @opts ) - @server.logger.level = @opts.datastore[:log_level] if @opts.datastore[:log_level] - @server.add_async_check do |method| # methods that expect a block are async method.parameters.flatten.include? :block end - @url = "#{@opts.rpc_external_address}:#{@opts.rpc_port.to_s}" + @url = "#{@options.dispatcher.external_address}:#{@options.rpc.server_port}" # let the instances in the pool know who to ask for routing instructions # when we're in grid mode. - @opts.datastore[:dispatcher_url] = @url.dup + @options.datastore.dispatcher_url = @url prep_logging - print_status 'Initing RPC Server...' + print_status 'Starting the RPC Server...' @server.add_handler( 'dispatcher', self ) # trap interrupts and exit cleanly when required trap_interrupts { shutdown } @jobs = [] @consumed_pids = [] - @pool = ::EM::Queue.new + @pool = Reactor.global.create_queue - if @opts.pool_size > 0 - print_status 'Warming up the pool...' - @opts.pool_size.times { add_instance_to_pool } + if @options.dispatcher.pool_size > 0 + @options.dispatcher.pool_size.times { add_instance_to_pool( false ) } end - print_status 'Initialization complete.' + # Check up on the pool and start the server once it has been filled. + Reactor.global.at_interval( 0.1 ) do |task| + next if @options.dispatcher.pool_size != @pool.size + task.done - @node = Node.new( @opts, @logfile ) - @server.add_handler( 'node', @node ) + _services.each do |name, service| + @server.add_handler( name, service.new( @options, self ) ) + end - _handlers.each do |name, handler| - @server.add_handler( name, handler.new( @opts, self ) ) - end + @node = Node.new( @options, @logfile ) + @server.add_handler( 'node', @node ) - run + run + end end - def handlers - _handlers.keys + def services + _services.keys end - # @return [TrueClass] true + # @return [TrueClass] + # true def alive? @server.alive? end # @return [String] @@ -132,246 +115,181 @@ return end each = proc do |neighbour, iter| connect_to_peer( neighbour ).workload_score do |score| - iter.return score.rpc_exception? ? nil : [neighbour, score] + iter.return (!score || score.rpc_exception?) ? nil : [neighbour, score] end end after = proc do |nodes| nodes.compact! nodes << [@url, workload_score] block.call nodes.sort_by { |_, score| score }[0][0] end - ::EM::Iterator.new( @node.neighbours ).map( each, after ) + Reactor.global.create_iterator( @node.neighbours ).map( each, after ) end + # Dispatches an {Instance} from the pool. # - # Dispatches an Instance from the pool. - # - # @param [String] owner An owner to assign to the Instance. - # @param [Hash] helpers Hash of helper data to be added to the job. + # @param [String] owner + # An owner to assign to the {Instance}. + # @param [Hash] helpers + # Hash of helper data to be added to the job. # @param [Boolean] load_balance - # Return an Instance from the least burdened Dispatcher (when in Grid mode) + # Return an {Instance} from the least burdened {Dispatcher} (when in Grid mode) # or from this one directly? # - # @return [Hash] Includes port number, owner, clock info and proc info. + # @return [Hash, false, nil] + # Depending on availability: # + # * `Hash`: Includes URL, owner, clock info and proc info. + # * `false`: Pool is currently empty, check back again in a few seconds. + # * `nil`: The {Dispatcher} was configured with a pool-size of `0`. def dispatch( owner = 'unknown', helpers = {}, load_balance = true, &block ) if load_balance && @node.grid_member? preferred do |url| connect_to_peer( url ).dispatch( owner, helpers, false, &block ) end return end - if @opts.pool_size <= 0 - block.call false + if @options.dispatcher.pool_size <= 0 + block.call nil return end - @pool.pop do |cjob| - cjob['owner'] = owner.to_s - cjob['starttime'] = Time.now - cjob['helpers'] = helpers + if @pool.empty? + block.call false + else + @pool.pop do |cjob| + cjob['owner'] = owner.to_s + cjob['starttime'] = Time.now.to_s + cjob['helpers'] = helpers - print_status "Instance dispatched -- PID: #{cjob['pid']} - " + - "Port: #{cjob['port']} - Owner: #{cjob['owner']}" + print_status "Instance dispatched -- PID: #{cjob['pid']} - " + + "Port: #{cjob['port']} - Owner: #{cjob['owner']}" - @jobs << cjob - block.call cjob + @jobs << cjob + block.call cjob + end end - ::EM.next_tick { add_instance_to_pool } + Reactor.global.schedule { add_instance_to_pool } end - # # Returns proc info for a given pid # # @param [Fixnum] pid # # @return [Hash] - # def job( pid ) @jobs.each do |j| next if j['pid'] != pid cjob = j.dup - cjob['currtime'] = Time.now - cjob['age'] = cjob['currtime'] - cjob['birthdate'] - cjob['runtime'] = cjob['currtime'] - cjob['starttime'] - cjob['proc'] = proc_hash( cjob['pid'] ) + currtime = Time.now + cjob['currtime'] = currtime.to_s + cjob['age'] = currtime - Time.parse( cjob['birthdate'] ) + cjob['runtime'] = currtime - Time.parse( cjob['starttime'] ) + cjob['alive'] = !!Process.kill( 0, pid ) rescue false + return cjob end end - # @return [Array<Hash>] Returns proc info for all jobs. + # @return [Array<Hash>] + # Returns info for all jobs. def jobs @jobs.map { |cjob| job( cjob['pid'] ) }.compact end + # @return [Array<Hash>] + # Returns info for all running jobs. # - # @return [Array<Hash>] Returns proc info for all running jobs. - # # @see #jobs - # def running_jobs - jobs.reject { |job| job['proc'].empty? } + jobs.select { |job| job['alive'] } end + # @return [Array<Hash>] + # Returns info for all finished jobs. # - # @return [Array<Hash>] Returns proc info for all finished jobs. - # # @see #jobs - # def finished_jobs - jobs.select { |job| job['proc'].empty? } + jobs.reject { |job| job['alive'] } end # @return [Float] # Workload score for this Dispatcher, calculated using the number # of {#running_jobs} and the configured node weight. # # Lower is better. - # def workload_score score = (running_jobs.size + 1).to_f score *= @node.info['weight'].to_f if @node.info['weight'] score end # @return [Hash] # Returns server stats regarding the jobs and pool. - def stats + def statistics stats_h = { 'running_jobs' => running_jobs, 'finished_jobs' => finished_jobs, - 'init_pool_size' => @opts.pool_size, + 'init_pool_size' => @options.dispatcher.pool_size, 'curr_pool_size' => @pool.size, - 'consumed_pids' => @consumed_pids + 'consumed_pids' => @consumed_pids, + 'snapshots' => Dir.glob( "#{@options.snapshot.save_path}*.afs" ) } stats_h.merge!( 'node' => @node.info, 'neighbours' => @node.neighbours ) stats_h['node']['score'] = workload_score stats_h end - # @return [String] contents of the log file + # @return [String] + # Contents of the log file def log IO.read prep_logging end - # @return [Hash] the server's proc info - def proc_info - proc_hash( Process.pid ).merge( 'node' => @node.info ) + # @private + def pid + Process.pid end private - def self._handlers - @handlers ||= nil - return @handlers if @handlers + def self._services + @services ||= nil + return @services if @services - @handlers = Component::Manager.new( Options.dir['rpcd_handlers'], HANDLER_NAMESPACE ) - @handlers.load_all - @handlers + @services = Component::Manager.new( Options.paths.services, SERVICE_NAMESPACE ) + @services.load_all + @services end - def _handlers - self.class._handlers + def _services + self.class._services end - # - # Outputs the Arachni banner.<br/> - # Displays version number, revision number, author details etc. - # - def banner - puts BANNER - puts - puts - end - - def print_help - puts <<USAGE - Usage: arachni_rpcd \[options\] - - Supported options: - - -h - --help output this - - --address=<host> specify address to bind to - (Default: #{@opts.rpc_address}) - - --external-address=<host> specify the external address used to access this Dispatcher - (Defaults to the value of '--address'.) - - --port=<num> specify port to listen to - (Default: #{@opts.rpc_port}) - - --port-range=<beginning>-<end> - - specify port range for the RPC instances - (Make sure to allow for a few hundred ports.) - (Default: #{@opts.rpc_instance_port_range.join( '-' )}) - - --reroute-to-logfile reroute all output to a logfile under 'logs/' - - --pool-size=<num> how many server workers/processes should be available - at any given moment (Default: #{@opts.pool_size}) - - --neighbour=<URL> URL of a neighbouring Dispatcher (used to build a grid) - - --weight=<float> weight of the Dispatcher - - --pipe-id=<string> bandwidth pipe identification - - --nickname=<string> nickname of the Dispatcher - - --debug - - - SSL -------------------------- - - (All SSL options will be honored by the dispatched RPC instances as well.) - (Do *not* use encrypted keys!) - - --ssl-pkey <file> location of the server SSL private key (.pem) - (Used to verify the server to the clients.) - - --ssl-cert <file> location of the server SSL certificate (.pem) - (Used to verify the server to the clients.) - - --node-ssl-pkey <file> location of the client SSL private key (.pem) - (Used to verify this node to other servers.) - - --node-ssl-cert <file> location of the client SSL certificate (.pem) - (Used to verify this node to other servers.) - - --ssl-ca <file> location of the CA certificate (.pem) - -USAGE - end - - def trap_interrupts( &block ) %w(QUIT INT).each do |signal| trap( signal, &block || Proc.new{ } ) if Signal.list.has_key?( signal ) end end # Starts the dispatcher's server def run - print_status 'Starting the server...' + print_status 'Ready' @server.start rescue => e - print_error e.to_s - print_error_backtrace e + print_exception e $stderr.puts "Could not start server, for details see: #{@logfile}" # If the server fails to start kill the pool Instances # to prevent zombie processes. @@ -379,11 +297,11 @@ exit 1 end def shutdown print_status 'Shutting down...' - @server.shutdown + Arachni::Reactor.global.stop end def kill( pid ) begin 10.times { Process.kill( 'KILL', pid ) } @@ -391,53 +309,49 @@ rescue Errno::ESRCH return true end end - def add_instance_to_pool + def add_instance_to_pool( one_at_a_time = true ) + return if @operation_in_progress && one_at_a_time + @operation_in_progress = true + owner = 'dispatcher' - exception_jail { + port = available_port + token = generate_token - # get an available port for the child - port = available_port - token = generate_token + pid = Processes::Manager.spawn( :instance, port: port, token: token ) + Process.detach( pid ) + @consumed_pids << pid - pid = fork do - @opts.rpc_port = port - Server::Instance.new( @opts, token ) - end + print_status "Instance added to pool -- PID: #{pid} - " + + "Port: #{port} - Owner: #{owner}" - print_status "Instance added to pool -- PID: #{pid} - " + - "Port: #{port} - Owner: #{owner}" + url = "#{@options.dispatcher.external_address}:#{port}" + # Wait until the Instance has booted before adding it to the pool. + Client::Instance.when_ready( url, token ) do + @operation_in_progress = false + @pool << { 'token' => token, 'pid' => pid, 'port' => port, - 'url' => "#{@opts.rpc_external_address}:#{port}", + 'url' => url, 'owner' => owner, - 'birthdate' => Time.now + 'birthdate' => Time.now.to_s } - - @consumed_pids << pid - - # let the child go about its business - Process.detach( pid ) - } + end end def prep_logging # reroute all output to a logfile - @logfile ||= reroute_to_file( @opts.dir['logs'] + - "/Dispatcher - #{Process.pid}-#{@opts.rpc_port}.log" ) + @logfile ||= reroute_to_file( @options.paths.logs + + "/Dispatcher - #{Process.pid}-#{@options.rpc.server_port}.log" ) end - def proc_hash( pid ) - struct_to_h( ProcTable.ps( pid ) ) - end - def connect_to_peer( url ) - Client::Dispatcher.new( @opts, url ) + Client::Dispatcher.new( @options, url ) end def struct_to_h( struct ) hash = {} return hash if !struct