lib/metasploit/aggregator.rb in metasploit-aggregator-0.1.2 vs lib/metasploit/aggregator.rb in metasploit-aggregator-0.1.3
- old
+ new
@@ -1,12 +1,13 @@
require 'socket'
require 'openssl'
require 'thread'
-require 'msgpack'
-require 'msgpack/rpc'
require 'securerandom'
+require 'metasploit/aggregator/error'
+require 'metasploit/aggregator/messages_pb'
+require 'metasploit/aggregator/aggregator_services_pb'
require 'metasploit/aggregator/version'
require 'metasploit/aggregator/cable'
require 'metasploit/aggregator/connection_manager'
require 'metasploit/aggregator/https_forwarder'
require 'metasploit/aggregator/http'
@@ -19,10 +20,15 @@
# return availability status of the service
def available?
# index for impl
end
+ # return the current service version found
+ def version
+ Metasploit::Aggregator::VERSION
+ end
+
# returns map of sessions available from the service
def sessions
# index for impl
end
@@ -78,138 +84,366 @@
end
end
class ServerProxy < Service
attr_reader :uuid
+ @exit_lock = Mutex.new
@host = @port = @socket = nil
- @response_queue = []
+ @no_params = nil
+ @response_queue = nil
+ @listening_thread = nil
+ @cleanup_list = nil
def initialize(host, port)
@host = host
@port = port
- @client = MessagePack::RPC::Client.new(@host, @port)
+ @client = Metasploit::Aggregator::Pb::Stub.new("#{@host}:#{@port}", :this_channel_is_insecure)
+ # TODO: add arg{ :channel_override => Core::Channel } to control connection
@uuid = SecureRandom.uuid
+ @no_params = Metasploit::Aggregator::Message::No_params.new
+ # server_version = pb_to_array(@client.version(@no_params).value)[0]
+ # raise CompatibilityError("server version mis-match found #{server_version}") unless server_version == version
end
def available?
- @client.call(:available?)
- rescue MessagePack::RPC::ConnectionTimeoutError => e
- false
+ @client.available(@no_params).answer
end
def sessions
- @client.call(:sessions)
- rescue MessagePack::RPC::TimeoutError => e
- Logger.log(e.to_s)
+ pb_to_map(@client.sessions(@no_params).map)
end
def cables
- @client.call(:cables)
- rescue MessagePack::RPC::TimeoutError => e
- Logger.log(e.to_s)
+ pb_to_array(@client.cables(@no_params).value)
end
def obtain_session(payload, uuid)
- @client.call(:obtain_session, payload, uuid)
- rescue MessagePack::RPC::TimeoutError => e
- Logger.log(e.to_s)
+ args = Metasploit::Aggregator::Message::String_array.new( value: [payload, uuid] )
+ @client.obtain_session(args).answer
end
def release_session(payload)
- @client.call(:release_session, payload)
- rescue MessagePack::RPC::TimeoutError => e
- Logger.log(e.to_s)
+ args = Metasploit::Aggregator::Message::String_array.new( value: [payload] )
+ @client.release_session(args).answer
end
def session_details(payload)
- @client.call(:session_details, payload)
- rescue MessagePack::RPC::TimeoutError => e
- Logger.log(e.to_s)
+ args = Metasploit::Aggregator::Message::String_array.new( value: [payload] )
+ pb_to_map(@client.session_details(args).map)
end
def add_cable(type, host, port, certificate = nil)
- @client.call(:add_cable, type, host, port, certificate)
- rescue MessagePack::RPC::TimeoutError => e
- Logger.log(e.to_s)
+ args = nil
+ if certificate.nil?
+ args = Metasploit::Aggregator::Message::Cable_def.new( type: type, host: host, port: port.to_i )
+ else
+ args = Metasploit::Aggregator::Message::Cable_def.new( type: type, host: host, port: port.to_i, pem: certificate )
+ end
+ @client.add_cable(args).answer
end
def remove_cable(host, port)
- @client.call(:remove_cable, host, port)
- rescue MessagePack::RPC::TimeoutError => e
- Logger.log(e.to_s)
+ args = Metasploit::Aggregator::Message::String_array.new( value: [host, port] )
+ @client.remove_cable(args).answer
end
def register_default(uuid, payload_list)
- @client.call(:register_default, uuid, payload_list)
- rescue MessagePack::RPC::TimeoutError => e
- Logger.log(e.to_s)
+ uuid = "" if uuid.nil?
+ payloads = []
+ payloads = payload + payload_list unless payload_list.nil?
+ args = Metasploit::Aggregator::Message::Register.new( uuid: uuid, payloads: payloads )
+ @client.register_default(args).answer
end
def default
- @client.call(:default)
- rescue MessagePack::RPC::TimeoutError => e
- Logger.log(e.to_s)
+ pb_to_array(@client.default(@no_params).value)[0]
end
def available_addresses
- @client.call(:available_addresses)
- rescue MessagePack::RPC::TimeoutError => e
- Logger.log(e.to_s)
+ pb_to_array(@client.available_addresses(@no_params).value)
end
- def stop
- @client.close
- @client = nil
+ def stop(force = false)
+ # end the response queue
+ ServerProxy.unregister_for_cleanup(self) unless force
+ @response_queue.push(self) unless @response_queue.nil?
+
@listening_thread.join if @listening_thread
+ @listening_thread = nil
+ @client = nil
end
def register_response_channel(requester)
unless requester.kind_of? Metasploit::Aggregator::Http::Requester
raise ArgumentError("response channel class invalid")
end
@response_io = requester
- start_responding
+ process
end
- def start_responding
+ protected
+
+ def self.register_for_cleanup(connection)
+ @exit_lock.synchronize do
+ unless @cleanup_list
+ @cleanup_list = ::Set.new
+ at_exit { ServerProxy.run_cleanup }
+ end
+ @cleanup_list.add connection
+ end
+ end
+
+ def self.unregister_for_cleanup(connection)
+ @exit_lock.synchronize do
+ @cleanup_list.delete connection if @cleanup_list
+ end
+ end
+
+ def self.run_cleanup
+ @exit_lock.synchronize do
+ if @cleanup_list
+ @cleanup_list.each do |connection|
+ connection.stop(true)
+ end
+ end
+ end
+ end
+
+ private
+
+ def pb_to_map(map)
+ result = {}
+ map.each do |key , value|
+ result[key] = value
+ end
+ result
+ end
+
+ def pb_to_array(array)
+ result = []
+ array.each do |value|
+ result << value
+ end
+ result
+ end
+
+ def process
+ @response_queue = EnumeratorQueue.new(self)
+ requests = @client.process(@response_queue.each_item)
+
+ # add initial key response with only local uuid
+ initial_response = Metasploit::Aggregator::Message::Response.new( uuid: @uuid )
+ @response_queue.push(initial_response)
+
@listening_thread = Thread.new do
- @listener_client = MessagePack::RPC::Client.new(@host, @port) unless @listener_client
- while @client
- begin
- sleep 0.1 # polling for now need
- result, result_obj, session_id, response_obj = nil
- result = @listener_client.call(:request, @uuid)
- next unless result # just continue to poll if no request is found
- result_obj = Metasploit::Aggregator::Http::Request.from_msgpack(result)
- session_id = Metasploit::Aggregator::Http::Request.parse_uri(result_obj)
- response_obj = @response_io.process_request(result_obj)
- @listener_client.call(:respond, session_id, response_obj.to_msgpack)
- rescue MessagePack::RPC::TimeoutError
- next
- rescue
- Logger.log $!
+ requests.each do |pb_request|
+ request = Metasploit::Aggregator::Http::Request.new(pb_to_array(pb_request.headers), pb_request.body, nil)
+ response = @response_io.process_request(request)
+ session_id = Metasploit::Aggregator::Http::Request.parse_uri(request)
+ pb_request = Metasploit::Aggregator::Message::Request.new( headers: response.headers, body: response.body )
+ pb_response = Metasploit::Aggregator::Message::Response.new( uuid: session_id, response: pb_request)
+ @response_queue.push(pb_response)
+ end
+ end
+ ServerProxy.register_for_cleanup self
+ end
+ end
+
+ # A EnumeratorQueue wraps a Queue to yield the items added to it.
+ class EnumeratorQueue
+ extend Forwardable
+ def_delegators :@q, :push
+
+ def initialize(sentinel)
+ @q = Queue.new
+ @sentinel = sentinel
+ end
+
+ def each_item
+ return enum_for(:each_item) unless block_given?
+ loop do
+ r = @q.pop
+ break if r.equal?(@sentinel)
+ fail r if r.is_a? Exception
+ yield r
+ end
+ end
+ end
+
+ class ServerImpl < Metasploit::Aggregator::Pb::Service
+
+ def initialize
+ super
+ @local_server = Server.new
+ @requestThreads = {}
+ @listeners = []
+ end
+
+ def available(_no_params, _unused_call)
+ Metasploit::Aggregator::Message::Result.new( answer: @local_server.available? )
+ end
+
+ def version(_no_params, _unused_call)
+ Metasploit::Aggregator::Message::String_array.new( value: [ @local_server.version ] )
+ end
+
+ def sessions(_no_parms, _unused_call)
+ Metasploit::Aggregator::Message::Result_map.new( map: @local_server.sessions() )
+ end
+
+ def cables(_no_parms, _unused_call)
+ Metasploit::Aggregator::Message::String_array.new( value: @local_server.cables() )
+ end
+
+ def obtain_session(args, _unused_call)
+ payload, uuid = args.value
+ Metasploit::Aggregator::Message::Result.new( answer: @local_server.obtain_session(payload, uuid) )
+ end
+
+ def release_session(args, _unused_call)
+ payload = args.value.shift
+ Metasploit::Aggregator::Message::Result.new( answer: @local_server.release_session(payload) )
+ end
+
+ def session_details(args, _unused_call)
+ payload = args.value.shift
+ Metasploit::Aggregator::Message::Result_map.new( map: @local_server.session_details(payload) )
+ end
+
+ def add_cable(cable, _unused_call)
+ pem = nil
+ pem = cable.pem unless cable.pem.empty?
+ result = @local_server.add_cable(cable.type, cable.host, cable.port, pem)
+ Metasploit::Aggregator::Message::Result.new( answer: result )
+ end
+
+ def remove_cable(args, _unused_call)
+ host, port = args.value
+ result = @local_server.remove_cable(host, port)
+ Metasploit::Aggregator::Message::Result.new( answer: result )
+ end
+
+ def register_default(register, _unused_call)
+ payloads = nil
+ payloads = register.payloads unless register.payloads.empty?
+ result = @local_server.register_default(register.uuid, payloads)
+ Metasploit::Aggregator::Message::Result.new( answer: result )
+ end
+
+ def default(_no_params, _unused_call)
+ uuid = @local_server.default
+ return Metasploit::Aggregator::Message::String_array.new( value: [ uuid ] ) unless uuid.nil?
+ Metasploit::Aggregator::Message::String_array.new()
+ end
+
+ def available_addresses(_no_params, _unused_call)
+ addresses = @local_server.available_addresses
+ Metasploit::Aggregator::Message::String_array.new( value: addresses )
+ end
+
+ def process(responses)
+ requests = EnumeratorQueue.new(self)
+ uuid = nil
+
+ requestingThread = Thread.new do
+ loop do
+ sleep 0.1 # outer loop only occurs until uuid is set
+ next if uuid.nil?
+ request = @local_server.request(uuid)
+ # TODO: with this in place we can just get the request queue and pop each item to process and forward
+ unless request.nil?
+ body = ""
+ body = request.body unless request.body.nil?
+ pb_request = Metasploit::Aggregator::Message::Request.new( headers: request.headers, body: body )
+ requests.push(pb_request)
end
end
- @listener_client.close
end
+
+ Thread.new do
+ responses.each do |response|
+ uuid = response.uuid if uuid.nil?
+ next if response.response.nil?
+ request_pb = response.response
+ request = Metasploit::Aggregator::Http::Request.new(request_pb.headers, request_pb.body, nil)
+ @local_server.respond(response.uuid, request)
+ end
+ requestingThread.exit
+ requestingThread.join
+ requests.push(self)
+ end
+
+ requests.each_item
end
- end # ServerProxy
+ end
+ class GrpcServer
+ @exit_lock = Mutex.new
+
+ def initialize(host, port)
+ @host = host
+ @port = port
+
+ # TODO: investigate using Core::Channel to secure this communication
+ # server = TCPServer.new(@host, @port)
+ # sslContext = OpenSSL::SSL::SSLContext.new
+ # sslContext.key, sslContext.cert = Metasploit::Aggregator::ConnectionManager.ssl_generate_certificate
+ # sslServer = OpenSSL::SSL::SSLServer.new(server, sslContext)
+
+ @svr = GRPC::RpcServer.new
+ @svr.add_http2_port("#{@host}:#{@port}", :this_port_is_insecure)
+ @svr.handle(ServerImpl)
+
+ @exec = Thread.new do
+ GrpcServer.register_for_cleanup(self)
+ @svr.run_till_terminated
+ end
+ end
+
+ def stop(force = false)
+ GrpcServer.unregister_for_cleanup(self) unless force
+ @svr.stop if @svr.running?
+ end
+
+ protected
+
+ def self.register_for_cleanup(connection)
+ @exit_lock.synchronize do
+ unless @cleanup_list
+ @cleanup_list = ::Set.new
+ at_exit { GrpcServer.run_cleanup }
+ end
+ @cleanup_list.add connection
+ end
+ end
+
+ def self.unregister_for_cleanup(connection)
+ @exit_lock.synchronize do
+ @cleanup_list.delete connection if @cleanup_list
+ end
+ end
+
+ def self.run_cleanup
+ @exit_lock.synchronize do
+ if @cleanup_list
+ @cleanup_list.each do |connection|
+ connection.stop(true)
+ end
+ end
+ end
+ end
+
+ end
+
class Server < Service
- # include Metasploit::Aggregator::ConnectionManager
def initialize
- @manager = nil
@router = Router.instance
+ @manager = ConnectionManager.instance
end
- def start
- @manager = Metasploit::Aggregator::ConnectionManager.new
- true
- end
-
def available?
!@manager.nil?
end
def sessions
@@ -264,11 +498,11 @@
@manager.register_forward(uuid, payload_list)
true
end
def default
- send, recv, console = @router.get_forward('default')
+ _send, _recv, console = @router.get_forward('default')
console
end
def available_addresses
addr_list = Socket.ip_address_list
@@ -287,20 +521,19 @@
true
end
def request(uuid)
# return requests here
- result = nil
- send, recv = @router.reverse_route(uuid)
+ send, _recv = @router.reverse_route(uuid)
if send.length > 0
result = send.pop
end
result
end
def respond(uuid, data)
- send, recv = @router.get_forward(uuid)
+ _send, recv = @router.get_forward(uuid)
recv << data unless recv.nil?
true
end
def register_response_channel(io)
@@ -308,74 +541,7 @@
response = "register_response_channel not implemented on server"
Logger.log response
response
end
end # class Server
-
- # wrapping class required to avoid MsgPack specific needs to parallel request processing.
- class AsyncMsgPackServer < Server
-
- def initialize
- super
- end
-
- # MsgPack specific wrapper for listener due to lack of parallel processing
- def request(uuid)
- result = super(uuid)
- sendMsg = nil
- if result
- begin
- sendMsg = result.to_msgpack
- rescue Exception => e
- Logger.log e.backtrace
- # when an error occurs here we should likely respond with an error of some sort to remove block on response
- end
- end
- sendMsg
- end
-
- # MsgPack specific wrapper for listener due to lack of parallel processing
- def respond(uuid, data)
- begin
- result = super(uuid, Metasploit::Aggregator::Http::Request.from_msgpack(data))
- result
- rescue Exception => e
- Logger.log e.backtrace
- end
- end
- end # AsyncMsgPackServer
-
- class MsgPackServer
-
- def initialize(host, port)
- @host = host
- @port = port
-
- # server = TCPServer.new(@host, @port)
- # sslContext = OpenSSL::SSL::SSLContext.new
- # sslContext.key, sslContext.cert = Metasploit::Aggregator::ConnectionManager.ssl_generate_certificate
- # sslServer = OpenSSL::SSL::SSLServer.new(server, sslContext)
- #
- @svr = MessagePack::RPC::Server.new # need to initialize this as ssl server
- # @svr.listen(sslServer, Server.new)
- @svr.listen(@host, @port, AsyncMsgPackServer.new)
-
- Thread.new { @svr.run }
- end
-
- def start
- c = MessagePack::RPC::Client.new(@host,@port)
- c.call(:start)
- c.close
- rescue MessagePack::RPC::TimeoutError => e
- Logger.log(e.to_s)
- end
-
- def stop
- c = MessagePack::RPC::Client.new(@host,@port)
- c.call(:stop)
- c.close
- @svr.close
- end
- end
end
end