lib/marvin/distributed/client.rb in marvin-0.8.0.0 vs lib/marvin/distributed/client.rb in marvin-0.8.0.1

- old
+ new

@@ -24,27 +24,37 @@ def host_with_port @host_with_port end - def method_missing(name, *args) + def method_missing(name, *args, &blk) logger.debug "Proxying #{name}(#{args.inspect[1..-2]}) to #{@host_with_port}" + cb = nil + if blk.present? + cb = proc do |_, options| + if options.is_a?(Hash) + value = options.delete("return-value") + blk.call(value) + end + end + end @connection.send_message(:action, { "action" => name.to_s, "arguments" => args, "client-host" => @host_with_port - }) + }, &cb) end end class EMConnection < Marvin::Distributed::Protocol register_handler_method :event register_handler_method :authentication_failed register_handler_method :authenticated register_handler_method :unauthorized + register_handler_method :welcome cattr_accessor :stopping self.stopping = false attr_accessor :client, :port, :connection_host, :connection_port, :configuration @@ -58,24 +68,17 @@ end def post_init super logger.info "Connected to distributed server" - if should_use_tls? - logger.info "Attempting to initialize tls" - start_tls - else - process_authentication - end end - def ssl_handshake_completed - logger.info "tls handshake completed" - process_authentication if should_use_tls? + def post_connect + logger.info "Connection started; processing authentication" + process_authentication end - def unbind if self.stopping logger.info "Stopping distributed client" else logger.info "Lost connection to distributed client - Scheduling reconnect" @@ -89,10 +92,19 @@ logger.info "Attempting to authenticate..." send_message(:authenticate, {:token => configuration.token}) end end + def handle_welcome(options = {}) + if should_use_ssl? && !ssl_enabled? + request_ssl! + else + @connected = true + post_connect + end + end + def handle_event(options = {}) event = options["event-name"] client_host = options["client-host"] client_nick = options["client-nick"] options = options["event-options"] @@ -140,37 +152,12 @@ c.connection_host = host c.connection_port = port end end - protected - - def options_for_callback(blk) - return {} if blk.blank? - cb_id = "callback-#{seld.object_id}-#{Time.now.to_f}" - count = 0 - count += 1 while @callbacks.has_key?(Digest::SHA256.hexdigest("#{cb_id}-#{count}")) - final_id = Digest::SHA256.hexdigest("#{cb_id}-#{count}") - @callbacks[final_id] = blk - {"callback-id" => final_id} - end - - def process_callback(hash) - if hash.is_a?(Hash) && hash.has_key?("callback-id") - callback = @callbacks.delete(hash["callback-id"]) - callback.call(self, hash) - end - end - - def host_with_port - @host_with_port ||= begin - port, ip = Socket.unpack_sockaddr_in(get_peername) - "#{ip}:#{port}" - end - end - - def should_use_tls? - @using_tls ||= configuration.encrypted? + def request_ssl! + logger.info "Requesting SSL for Distributed Client" + send_message(:enable_ssl) unless ssl_enabled? end end def initialize(em_connection) \ No newline at end of file