lib/arachni/rpc/server/dispatcher.rb in arachni-0.4.7 vs lib/arachni/rpc/server/dispatcher.rb in arachni-1.0
- old
+ new
@@ -1,35 +1,25 @@
=begin
- Copyright 2010-2014 Tasos Laskos <tasos.laskos@gmail.com>
+ Copyright 2010-2014 Tasos Laskos <tasos.laskos@arachni-scanner.com>
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ This file is part of the Arachni Framework project and is subject to
+ redistribution and commercial restrictions. Please see the Arachni Framework
+ web site for more information on licensing and terms of use.
=end
-require 'socket'
-require 'sys/proctable'
-
module Arachni
-require Options.dir['lib'] + 'rpc/client'
-require Options.dir['lib'] + 'rpc/server/base'
-require Options.dir['lib'] + 'rpc/server/instance'
-require Options.dir['lib'] + 'rpc/server/output'
+lib = Options.paths.lib
+require lib + 'processes/manager'
+require lib + 'rpc/client'
+require lib + 'rpc/server/base'
+require lib + 'rpc/server/instance'
+require lib + 'rpc/server/output'
module RPC
class Server
-#
# Dispatches RPC Instances on demand providing a centralized environment
# for multiple clients and allows for extensive process monitoring.
#
# The process goes something like this:
#
@@ -41,86 +31,79 @@
# * The client connects to the Instance using these credentials.
#
# Once the client finishes using the RPC Instance he *must* shut it down
# otherwise the system will be eaten away by zombie RPC Instance processes.
#
-# @author Tasos "Zapotek" Laskos <tasos.laskos@gmail.com>
-#
+# @author Tasos "Zapotek" Laskos <tasos.laskos@arachni-scanner.com>
class Dispatcher
- require Options.dir['lib'] + 'rpc/server/dispatcher/node'
- require Options.dir['lib'] + 'rpc/server/dispatcher/handler'
+ require Options.paths.lib + 'rpc/server/dispatcher/node'
+ require Options.paths.lib + 'rpc/server/dispatcher/service'
include Utilities
include UI::Output
- include ::Sys
- HANDLER_NAMESPACE = Handler
+ SERVICE_NAMESPACE = Service
- def initialize( opts = Options.instance )
- banner
+ def initialize( options = Options.instance )
+ @options = options
- @opts = opts
+ @options.dispatcher.external_address ||= @options.rpc.server_address
+ @options.snapshot.save_path ||= @options.paths.snapshots
- @opts.rpc_port ||= 7331
- @opts.rpc_address ||= 'localhost'
- @opts.rpc_external_address ||= @opts.rpc_address
- @opts.pool_size ||= 5
+ @server = Base.new( @options )
+ @server.logger.level = @options.datastore.log_level if @options.datastore.log_level
- if @opts.help
- print_help
- exit 0
- end
-
- @server = Base.new( @opts )
- @server.logger.level = @opts.datastore[:log_level] if @opts.datastore[:log_level]
-
@server.add_async_check do |method|
# methods that expect a block are async
method.parameters.flatten.include? :block
end
- @url = "#{@opts.rpc_external_address}:#{@opts.rpc_port.to_s}"
+ @url = "#{@options.dispatcher.external_address}:#{@options.rpc.server_port}"
# let the instances in the pool know who to ask for routing instructions
# when we're in grid mode.
- @opts.datastore[:dispatcher_url] = @url.dup
+ @options.datastore.dispatcher_url = @url
prep_logging
- print_status 'Initing RPC Server...'
+ print_status 'Starting the RPC Server...'
@server.add_handler( 'dispatcher', self )
# trap interrupts and exit cleanly when required
trap_interrupts { shutdown }
@jobs = []
@consumed_pids = []
- @pool = ::EM::Queue.new
+ @pool = Reactor.global.create_queue
- if @opts.pool_size > 0
- print_status 'Warming up the pool...'
- @opts.pool_size.times { add_instance_to_pool }
+ if @options.dispatcher.pool_size > 0
+ @options.dispatcher.pool_size.times { add_instance_to_pool( false ) }
end
- print_status 'Initialization complete.'
+ # Check up on the pool and start the server once it has been filled.
+ Reactor.global.at_interval( 0.1 ) do |task|
+ next if @options.dispatcher.pool_size != @pool.size
+ task.done
- @node = Node.new( @opts, @logfile )
- @server.add_handler( 'node', @node )
+ _services.each do |name, service|
+ @server.add_handler( name, service.new( @options, self ) )
+ end
- _handlers.each do |name, handler|
- @server.add_handler( name, handler.new( @opts, self ) )
- end
+ @node = Node.new( @options, @logfile )
+ @server.add_handler( 'node', @node )
- run
+ run
+ end
end
- def handlers
- _handlers.keys
+ def services
+ _services.keys
end
- # @return [TrueClass] true
+ # @return [TrueClass]
+ # true
def alive?
@server.alive?
end
# @return [String]
@@ -132,246 +115,181 @@
return
end
each = proc do |neighbour, iter|
connect_to_peer( neighbour ).workload_score do |score|
- iter.return score.rpc_exception? ? nil : [neighbour, score]
+ iter.return (!score || score.rpc_exception?) ? nil : [neighbour, score]
end
end
after = proc do |nodes|
nodes.compact!
nodes << [@url, workload_score]
block.call nodes.sort_by { |_, score| score }[0][0]
end
- ::EM::Iterator.new( @node.neighbours ).map( each, after )
+ Reactor.global.create_iterator( @node.neighbours ).map( each, after )
end
+ # Dispatches an {Instance} from the pool.
#
- # Dispatches an Instance from the pool.
- #
- # @param [String] owner An owner to assign to the Instance.
- # @param [Hash] helpers Hash of helper data to be added to the job.
+ # @param [String] owner
+ # An owner to assign to the {Instance}.
+ # @param [Hash] helpers
+ # Hash of helper data to be added to the job.
# @param [Boolean] load_balance
- # Return an Instance from the least burdened Dispatcher (when in Grid mode)
+ # Return an {Instance} from the least burdened {Dispatcher} (when in Grid mode)
# or from this one directly?
#
- # @return [Hash] Includes port number, owner, clock info and proc info.
+ # @return [Hash, false, nil]
+ # Depending on availability:
#
+ # * `Hash`: Includes URL, owner, clock info and proc info.
+ # * `false`: Pool is currently empty, check back again in a few seconds.
+ # * `nil`: The {Dispatcher} was configured with a pool-size of `0`.
def dispatch( owner = 'unknown', helpers = {}, load_balance = true, &block )
if load_balance && @node.grid_member?
preferred do |url|
connect_to_peer( url ).dispatch( owner, helpers, false, &block )
end
return
end
- if @opts.pool_size <= 0
- block.call false
+ if @options.dispatcher.pool_size <= 0
+ block.call nil
return
end
- @pool.pop do |cjob|
- cjob['owner'] = owner.to_s
- cjob['starttime'] = Time.now
- cjob['helpers'] = helpers
+ if @pool.empty?
+ block.call false
+ else
+ @pool.pop do |cjob|
+ cjob['owner'] = owner.to_s
+ cjob['starttime'] = Time.now.to_s
+ cjob['helpers'] = helpers
- print_status "Instance dispatched -- PID: #{cjob['pid']} - " +
- "Port: #{cjob['port']} - Owner: #{cjob['owner']}"
+ print_status "Instance dispatched -- PID: #{cjob['pid']} - " +
+ "Port: #{cjob['port']} - Owner: #{cjob['owner']}"
- @jobs << cjob
- block.call cjob
+ @jobs << cjob
+ block.call cjob
+ end
end
- ::EM.next_tick { add_instance_to_pool }
+ Reactor.global.schedule { add_instance_to_pool }
end
- #
# Returns proc info for a given pid
#
# @param [Fixnum] pid
#
# @return [Hash]
- #
def job( pid )
@jobs.each do |j|
next if j['pid'] != pid
cjob = j.dup
- cjob['currtime'] = Time.now
- cjob['age'] = cjob['currtime'] - cjob['birthdate']
- cjob['runtime'] = cjob['currtime'] - cjob['starttime']
- cjob['proc'] = proc_hash( cjob['pid'] )
+ currtime = Time.now
+ cjob['currtime'] = currtime.to_s
+ cjob['age'] = currtime - Time.parse( cjob['birthdate'] )
+ cjob['runtime'] = currtime - Time.parse( cjob['starttime'] )
+ cjob['alive'] = !!Process.kill( 0, pid ) rescue false
+
return cjob
end
end
- # @return [Array<Hash>] Returns proc info for all jobs.
+ # @return [Array<Hash>]
+ # Returns info for all jobs.
def jobs
@jobs.map { |cjob| job( cjob['pid'] ) }.compact
end
+ # @return [Array<Hash>]
+ # Returns info for all running jobs.
#
- # @return [Array<Hash>] Returns proc info for all running jobs.
- #
# @see #jobs
- #
def running_jobs
- jobs.reject { |job| job['proc'].empty? }
+ jobs.select { |job| job['alive'] }
end
+ # @return [Array<Hash>]
+ # Returns info for all finished jobs.
#
- # @return [Array<Hash>] Returns proc info for all finished jobs.
- #
# @see #jobs
- #
def finished_jobs
- jobs.select { |job| job['proc'].empty? }
+ jobs.reject { |job| job['alive'] }
end
# @return [Float]
# Workload score for this Dispatcher, calculated using the number
# of {#running_jobs} and the configured node weight.
#
# Lower is better.
- #
def workload_score
score = (running_jobs.size + 1).to_f
score *= @node.info['weight'].to_f if @node.info['weight']
score
end
# @return [Hash]
# Returns server stats regarding the jobs and pool.
- def stats
+ def statistics
stats_h = {
'running_jobs' => running_jobs,
'finished_jobs' => finished_jobs,
- 'init_pool_size' => @opts.pool_size,
+ 'init_pool_size' => @options.dispatcher.pool_size,
'curr_pool_size' => @pool.size,
- 'consumed_pids' => @consumed_pids
+ 'consumed_pids' => @consumed_pids,
+ 'snapshots' => Dir.glob( "#{@options.snapshot.save_path}*.afs" )
}
stats_h.merge!( 'node' => @node.info, 'neighbours' => @node.neighbours )
stats_h['node']['score'] = workload_score
stats_h
end
- # @return [String] contents of the log file
+ # @return [String]
+ # Contents of the log file
def log
IO.read prep_logging
end
- # @return [Hash] the server's proc info
- def proc_info
- proc_hash( Process.pid ).merge( 'node' => @node.info )
+ # @private
+ def pid
+ Process.pid
end
private
- def self._handlers
- @handlers ||= nil
- return @handlers if @handlers
+ def self._services
+ @services ||= nil
+ return @services if @services
- @handlers = Component::Manager.new( Options.dir['rpcd_handlers'], HANDLER_NAMESPACE )
- @handlers.load_all
- @handlers
+ @services = Component::Manager.new( Options.paths.services, SERVICE_NAMESPACE )
+ @services.load_all
+ @services
end
- def _handlers
- self.class._handlers
+ def _services
+ self.class._services
end
- #
- # Outputs the Arachni banner.<br/>
- # Displays version number, revision number, author details etc.
- #
- def banner
- puts BANNER
- puts
- puts
- end
-
- def print_help
- puts <<USAGE
- Usage: arachni_rpcd \[options\]
-
- Supported options:
-
- -h
- --help output this
-
- --address=<host> specify address to bind to
- (Default: #{@opts.rpc_address})
-
- --external-address=<host> specify the external address used to access this Dispatcher
- (Defaults to the value of '--address'.)
-
- --port=<num> specify port to listen to
- (Default: #{@opts.rpc_port})
-
- --port-range=<beginning>-<end>
-
- specify port range for the RPC instances
- (Make sure to allow for a few hundred ports.)
- (Default: #{@opts.rpc_instance_port_range.join( '-' )})
-
- --reroute-to-logfile reroute all output to a logfile under 'logs/'
-
- --pool-size=<num> how many server workers/processes should be available
- at any given moment (Default: #{@opts.pool_size})
-
- --neighbour=<URL> URL of a neighbouring Dispatcher (used to build a grid)
-
- --weight=<float> weight of the Dispatcher
-
- --pipe-id=<string> bandwidth pipe identification
-
- --nickname=<string> nickname of the Dispatcher
-
- --debug
-
-
- SSL --------------------------
-
- (All SSL options will be honored by the dispatched RPC instances as well.)
- (Do *not* use encrypted keys!)
-
- --ssl-pkey <file> location of the server SSL private key (.pem)
- (Used to verify the server to the clients.)
-
- --ssl-cert <file> location of the server SSL certificate (.pem)
- (Used to verify the server to the clients.)
-
- --node-ssl-pkey <file> location of the client SSL private key (.pem)
- (Used to verify this node to other servers.)
-
- --node-ssl-cert <file> location of the client SSL certificate (.pem)
- (Used to verify this node to other servers.)
-
- --ssl-ca <file> location of the CA certificate (.pem)
-
-USAGE
- end
-
-
def trap_interrupts( &block )
%w(QUIT INT).each do |signal|
trap( signal, &block || Proc.new{ } ) if Signal.list.has_key?( signal )
end
end
# Starts the dispatcher's server
def run
- print_status 'Starting the server...'
+ print_status 'Ready'
@server.start
rescue => e
- print_error e.to_s
- print_error_backtrace e
+ print_exception e
$stderr.puts "Could not start server, for details see: #{@logfile}"
# If the server fails to start kill the pool Instances
# to prevent zombie processes.
@@ -379,11 +297,11 @@
exit 1
end
def shutdown
print_status 'Shutting down...'
- @server.shutdown
+ Arachni::Reactor.global.stop
end
def kill( pid )
begin
10.times { Process.kill( 'KILL', pid ) }
@@ -391,53 +309,49 @@
rescue Errno::ESRCH
return true
end
end
- def add_instance_to_pool
+ def add_instance_to_pool( one_at_a_time = true )
+ return if @operation_in_progress && one_at_a_time
+ @operation_in_progress = true
+
owner = 'dispatcher'
- exception_jail {
+ port = available_port
+ token = generate_token
- # get an available port for the child
- port = available_port
- token = generate_token
+ pid = Processes::Manager.spawn( :instance, port: port, token: token )
+ Process.detach( pid )
+ @consumed_pids << pid
- pid = fork do
- @opts.rpc_port = port
- Server::Instance.new( @opts, token )
- end
+ print_status "Instance added to pool -- PID: #{pid} - " +
+ "Port: #{port} - Owner: #{owner}"
- print_status "Instance added to pool -- PID: #{pid} - " +
- "Port: #{port} - Owner: #{owner}"
+ url = "#{@options.dispatcher.external_address}:#{port}"
+ # Wait until the Instance has booted before adding it to the pool.
+ Client::Instance.when_ready( url, token ) do
+ @operation_in_progress = false
+
@pool << {
'token' => token,
'pid' => pid,
'port' => port,
- 'url' => "#{@opts.rpc_external_address}:#{port}",
+ 'url' => url,
'owner' => owner,
- 'birthdate' => Time.now
+ 'birthdate' => Time.now.to_s
}
-
- @consumed_pids << pid
-
- # let the child go about its business
- Process.detach( pid )
- }
+ end
end
def prep_logging
# reroute all output to a logfile
- @logfile ||= reroute_to_file( @opts.dir['logs'] +
- "/Dispatcher - #{Process.pid}-#{@opts.rpc_port}.log" )
+ @logfile ||= reroute_to_file( @options.paths.logs +
+ "/Dispatcher - #{Process.pid}-#{@options.rpc.server_port}.log" )
end
- def proc_hash( pid )
- struct_to_h( ProcTable.ps( pid ) )
- end
-
def connect_to_peer( url )
- Client::Dispatcher.new( @opts, url )
+ Client::Dispatcher.new( @options, url )
end
def struct_to_h( struct )
hash = {}
return hash if !struct