lib/metasploit/aggregator.rb in metasploit-aggregator-0.1.2 vs lib/metasploit/aggregator.rb in metasploit-aggregator-0.1.3

- old
+ new

@@ -1,12 +1,13 @@ require 'socket' require 'openssl' require 'thread' -require 'msgpack' -require 'msgpack/rpc' require 'securerandom' +require 'metasploit/aggregator/error' +require 'metasploit/aggregator/messages_pb' +require 'metasploit/aggregator/aggregator_services_pb' require 'metasploit/aggregator/version' require 'metasploit/aggregator/cable' require 'metasploit/aggregator/connection_manager' require 'metasploit/aggregator/https_forwarder' require 'metasploit/aggregator/http' @@ -19,10 +20,15 @@ # return availability status of the service def available? # index for impl end + # return the current service version found + def version + Metasploit::Aggregator::VERSION + end + # returns map of sessions available from the service def sessions # index for impl end @@ -78,138 +84,366 @@ end end class ServerProxy < Service attr_reader :uuid + @exit_lock = Mutex.new @host = @port = @socket = nil - @response_queue = [] + @no_params = nil + @response_queue = nil + @listening_thread = nil + @cleanup_list = nil def initialize(host, port) @host = host @port = port - @client = MessagePack::RPC::Client.new(@host, @port) + @client = Metasploit::Aggregator::Pb::Stub.new("#{@host}:#{@port}", :this_channel_is_insecure) + # TODO: add arg{ :channel_override => Core::Channel } to control connection @uuid = SecureRandom.uuid + @no_params = Metasploit::Aggregator::Message::No_params.new + # server_version = pb_to_array(@client.version(@no_params).value)[0] + # raise CompatibilityError("server version mis-match found #{server_version}") unless server_version == version end def available? - @client.call(:available?) - rescue MessagePack::RPC::ConnectionTimeoutError => e - false + @client.available(@no_params).answer end def sessions - @client.call(:sessions) - rescue MessagePack::RPC::TimeoutError => e - Logger.log(e.to_s) + pb_to_map(@client.sessions(@no_params).map) end def cables - @client.call(:cables) - rescue MessagePack::RPC::TimeoutError => e - Logger.log(e.to_s) + pb_to_array(@client.cables(@no_params).value) end def obtain_session(payload, uuid) - @client.call(:obtain_session, payload, uuid) - rescue MessagePack::RPC::TimeoutError => e - Logger.log(e.to_s) + args = Metasploit::Aggregator::Message::String_array.new( value: [payload, uuid] ) + @client.obtain_session(args).answer end def release_session(payload) - @client.call(:release_session, payload) - rescue MessagePack::RPC::TimeoutError => e - Logger.log(e.to_s) + args = Metasploit::Aggregator::Message::String_array.new( value: [payload] ) + @client.release_session(args).answer end def session_details(payload) - @client.call(:session_details, payload) - rescue MessagePack::RPC::TimeoutError => e - Logger.log(e.to_s) + args = Metasploit::Aggregator::Message::String_array.new( value: [payload] ) + pb_to_map(@client.session_details(args).map) end def add_cable(type, host, port, certificate = nil) - @client.call(:add_cable, type, host, port, certificate) - rescue MessagePack::RPC::TimeoutError => e - Logger.log(e.to_s) + args = nil + if certificate.nil? + args = Metasploit::Aggregator::Message::Cable_def.new( type: type, host: host, port: port.to_i ) + else + args = Metasploit::Aggregator::Message::Cable_def.new( type: type, host: host, port: port.to_i, pem: certificate ) + end + @client.add_cable(args).answer end def remove_cable(host, port) - @client.call(:remove_cable, host, port) - rescue MessagePack::RPC::TimeoutError => e - Logger.log(e.to_s) + args = Metasploit::Aggregator::Message::String_array.new( value: [host, port] ) + @client.remove_cable(args).answer end def register_default(uuid, payload_list) - @client.call(:register_default, uuid, payload_list) - rescue MessagePack::RPC::TimeoutError => e - Logger.log(e.to_s) + uuid = "" if uuid.nil? + payloads = [] + payloads = payload + payload_list unless payload_list.nil? + args = Metasploit::Aggregator::Message::Register.new( uuid: uuid, payloads: payloads ) + @client.register_default(args).answer end def default - @client.call(:default) - rescue MessagePack::RPC::TimeoutError => e - Logger.log(e.to_s) + pb_to_array(@client.default(@no_params).value)[0] end def available_addresses - @client.call(:available_addresses) - rescue MessagePack::RPC::TimeoutError => e - Logger.log(e.to_s) + pb_to_array(@client.available_addresses(@no_params).value) end - def stop - @client.close - @client = nil + def stop(force = false) + # end the response queue + ServerProxy.unregister_for_cleanup(self) unless force + @response_queue.push(self) unless @response_queue.nil? + @listening_thread.join if @listening_thread + @listening_thread = nil + @client = nil end def register_response_channel(requester) unless requester.kind_of? Metasploit::Aggregator::Http::Requester raise ArgumentError("response channel class invalid") end @response_io = requester - start_responding + process end - def start_responding + protected + + def self.register_for_cleanup(connection) + @exit_lock.synchronize do + unless @cleanup_list + @cleanup_list = ::Set.new + at_exit { ServerProxy.run_cleanup } + end + @cleanup_list.add connection + end + end + + def self.unregister_for_cleanup(connection) + @exit_lock.synchronize do + @cleanup_list.delete connection if @cleanup_list + end + end + + def self.run_cleanup + @exit_lock.synchronize do + if @cleanup_list + @cleanup_list.each do |connection| + connection.stop(true) + end + end + end + end + + private + + def pb_to_map(map) + result = {} + map.each do |key , value| + result[key] = value + end + result + end + + def pb_to_array(array) + result = [] + array.each do |value| + result << value + end + result + end + + def process + @response_queue = EnumeratorQueue.new(self) + requests = @client.process(@response_queue.each_item) + + # add initial key response with only local uuid + initial_response = Metasploit::Aggregator::Message::Response.new( uuid: @uuid ) + @response_queue.push(initial_response) + @listening_thread = Thread.new do - @listener_client = MessagePack::RPC::Client.new(@host, @port) unless @listener_client - while @client - begin - sleep 0.1 # polling for now need - result, result_obj, session_id, response_obj = nil - result = @listener_client.call(:request, @uuid) - next unless result # just continue to poll if no request is found - result_obj = Metasploit::Aggregator::Http::Request.from_msgpack(result) - session_id = Metasploit::Aggregator::Http::Request.parse_uri(result_obj) - response_obj = @response_io.process_request(result_obj) - @listener_client.call(:respond, session_id, response_obj.to_msgpack) - rescue MessagePack::RPC::TimeoutError - next - rescue - Logger.log $! + requests.each do |pb_request| + request = Metasploit::Aggregator::Http::Request.new(pb_to_array(pb_request.headers), pb_request.body, nil) + response = @response_io.process_request(request) + session_id = Metasploit::Aggregator::Http::Request.parse_uri(request) + pb_request = Metasploit::Aggregator::Message::Request.new( headers: response.headers, body: response.body ) + pb_response = Metasploit::Aggregator::Message::Response.new( uuid: session_id, response: pb_request) + @response_queue.push(pb_response) + end + end + ServerProxy.register_for_cleanup self + end + end + + # A EnumeratorQueue wraps a Queue to yield the items added to it. + class EnumeratorQueue + extend Forwardable + def_delegators :@q, :push + + def initialize(sentinel) + @q = Queue.new + @sentinel = sentinel + end + + def each_item + return enum_for(:each_item) unless block_given? + loop do + r = @q.pop + break if r.equal?(@sentinel) + fail r if r.is_a? Exception + yield r + end + end + end + + class ServerImpl < Metasploit::Aggregator::Pb::Service + + def initialize + super + @local_server = Server.new + @requestThreads = {} + @listeners = [] + end + + def available(_no_params, _unused_call) + Metasploit::Aggregator::Message::Result.new( answer: @local_server.available? ) + end + + def version(_no_params, _unused_call) + Metasploit::Aggregator::Message::String_array.new( value: [ @local_server.version ] ) + end + + def sessions(_no_parms, _unused_call) + Metasploit::Aggregator::Message::Result_map.new( map: @local_server.sessions() ) + end + + def cables(_no_parms, _unused_call) + Metasploit::Aggregator::Message::String_array.new( value: @local_server.cables() ) + end + + def obtain_session(args, _unused_call) + payload, uuid = args.value + Metasploit::Aggregator::Message::Result.new( answer: @local_server.obtain_session(payload, uuid) ) + end + + def release_session(args, _unused_call) + payload = args.value.shift + Metasploit::Aggregator::Message::Result.new( answer: @local_server.release_session(payload) ) + end + + def session_details(args, _unused_call) + payload = args.value.shift + Metasploit::Aggregator::Message::Result_map.new( map: @local_server.session_details(payload) ) + end + + def add_cable(cable, _unused_call) + pem = nil + pem = cable.pem unless cable.pem.empty? + result = @local_server.add_cable(cable.type, cable.host, cable.port, pem) + Metasploit::Aggregator::Message::Result.new( answer: result ) + end + + def remove_cable(args, _unused_call) + host, port = args.value + result = @local_server.remove_cable(host, port) + Metasploit::Aggregator::Message::Result.new( answer: result ) + end + + def register_default(register, _unused_call) + payloads = nil + payloads = register.payloads unless register.payloads.empty? + result = @local_server.register_default(register.uuid, payloads) + Metasploit::Aggregator::Message::Result.new( answer: result ) + end + + def default(_no_params, _unused_call) + uuid = @local_server.default + return Metasploit::Aggregator::Message::String_array.new( value: [ uuid ] ) unless uuid.nil? + Metasploit::Aggregator::Message::String_array.new() + end + + def available_addresses(_no_params, _unused_call) + addresses = @local_server.available_addresses + Metasploit::Aggregator::Message::String_array.new( value: addresses ) + end + + def process(responses) + requests = EnumeratorQueue.new(self) + uuid = nil + + requestingThread = Thread.new do + loop do + sleep 0.1 # outer loop only occurs until uuid is set + next if uuid.nil? + request = @local_server.request(uuid) + # TODO: with this in place we can just get the request queue and pop each item to process and forward + unless request.nil? + body = "" + body = request.body unless request.body.nil? + pb_request = Metasploit::Aggregator::Message::Request.new( headers: request.headers, body: body ) + requests.push(pb_request) end end - @listener_client.close end + + Thread.new do + responses.each do |response| + uuid = response.uuid if uuid.nil? + next if response.response.nil? + request_pb = response.response + request = Metasploit::Aggregator::Http::Request.new(request_pb.headers, request_pb.body, nil) + @local_server.respond(response.uuid, request) + end + requestingThread.exit + requestingThread.join + requests.push(self) + end + + requests.each_item end - end # ServerProxy + end + class GrpcServer + @exit_lock = Mutex.new + + def initialize(host, port) + @host = host + @port = port + + # TODO: investigate using Core::Channel to secure this communication + # server = TCPServer.new(@host, @port) + # sslContext = OpenSSL::SSL::SSLContext.new + # sslContext.key, sslContext.cert = Metasploit::Aggregator::ConnectionManager.ssl_generate_certificate + # sslServer = OpenSSL::SSL::SSLServer.new(server, sslContext) + + @svr = GRPC::RpcServer.new + @svr.add_http2_port("#{@host}:#{@port}", :this_port_is_insecure) + @svr.handle(ServerImpl) + + @exec = Thread.new do + GrpcServer.register_for_cleanup(self) + @svr.run_till_terminated + end + end + + def stop(force = false) + GrpcServer.unregister_for_cleanup(self) unless force + @svr.stop if @svr.running? + end + + protected + + def self.register_for_cleanup(connection) + @exit_lock.synchronize do + unless @cleanup_list + @cleanup_list = ::Set.new + at_exit { GrpcServer.run_cleanup } + end + @cleanup_list.add connection + end + end + + def self.unregister_for_cleanup(connection) + @exit_lock.synchronize do + @cleanup_list.delete connection if @cleanup_list + end + end + + def self.run_cleanup + @exit_lock.synchronize do + if @cleanup_list + @cleanup_list.each do |connection| + connection.stop(true) + end + end + end + end + + end + class Server < Service - # include Metasploit::Aggregator::ConnectionManager def initialize - @manager = nil @router = Router.instance + @manager = ConnectionManager.instance end - def start - @manager = Metasploit::Aggregator::ConnectionManager.new - true - end - def available? !@manager.nil? end def sessions @@ -264,11 +498,11 @@ @manager.register_forward(uuid, payload_list) true end def default - send, recv, console = @router.get_forward('default') + _send, _recv, console = @router.get_forward('default') console end def available_addresses addr_list = Socket.ip_address_list @@ -287,20 +521,19 @@ true end def request(uuid) # return requests here - result = nil - send, recv = @router.reverse_route(uuid) + send, _recv = @router.reverse_route(uuid) if send.length > 0 result = send.pop end result end def respond(uuid, data) - send, recv = @router.get_forward(uuid) + _send, recv = @router.get_forward(uuid) recv << data unless recv.nil? true end def register_response_channel(io) @@ -308,74 +541,7 @@ response = "register_response_channel not implemented on server" Logger.log response response end end # class Server - - # wrapping class required to avoid MsgPack specific needs to parallel request processing. - class AsyncMsgPackServer < Server - - def initialize - super - end - - # MsgPack specific wrapper for listener due to lack of parallel processing - def request(uuid) - result = super(uuid) - sendMsg = nil - if result - begin - sendMsg = result.to_msgpack - rescue Exception => e - Logger.log e.backtrace - # when an error occurs here we should likely respond with an error of some sort to remove block on response - end - end - sendMsg - end - - # MsgPack specific wrapper for listener due to lack of parallel processing - def respond(uuid, data) - begin - result = super(uuid, Metasploit::Aggregator::Http::Request.from_msgpack(data)) - result - rescue Exception => e - Logger.log e.backtrace - end - end - end # AsyncMsgPackServer - - class MsgPackServer - - def initialize(host, port) - @host = host - @port = port - - # server = TCPServer.new(@host, @port) - # sslContext = OpenSSL::SSL::SSLContext.new - # sslContext.key, sslContext.cert = Metasploit::Aggregator::ConnectionManager.ssl_generate_certificate - # sslServer = OpenSSL::SSL::SSLServer.new(server, sslContext) - # - @svr = MessagePack::RPC::Server.new # need to initialize this as ssl server - # @svr.listen(sslServer, Server.new) - @svr.listen(@host, @port, AsyncMsgPackServer.new) - - Thread.new { @svr.run } - end - - def start - c = MessagePack::RPC::Client.new(@host,@port) - c.call(:start) - c.close - rescue MessagePack::RPC::TimeoutError => e - Logger.log(e.to_s) - end - - def stop - c = MessagePack::RPC::Client.new(@host,@port) - c.call(:stop) - c.close - @svr.close - end - end end end