lib/marvin/distributed/client.rb in marvin-0.8.0.0 vs lib/marvin/distributed/client.rb in marvin-0.8.0.1
- old
+ new
@@ -24,27 +24,37 @@
def host_with_port
@host_with_port
end
- def method_missing(name, *args)
+ def method_missing(name, *args, &blk)
logger.debug "Proxying #{name}(#{args.inspect[1..-2]}) to #{@host_with_port}"
+ cb = nil
+ if blk.present?
+ cb = proc do |_, options|
+ if options.is_a?(Hash)
+ value = options.delete("return-value")
+ blk.call(value)
+ end
+ end
+ end
@connection.send_message(:action, {
"action" => name.to_s,
"arguments" => args,
"client-host" => @host_with_port
- })
+ }, &cb)
end
end
class EMConnection < Marvin::Distributed::Protocol
register_handler_method :event
register_handler_method :authentication_failed
register_handler_method :authenticated
register_handler_method :unauthorized
+ register_handler_method :welcome
cattr_accessor :stopping
self.stopping = false
attr_accessor :client, :port, :connection_host, :connection_port, :configuration
@@ -58,24 +68,17 @@
end
def post_init
super
logger.info "Connected to distributed server"
- if should_use_tls?
- logger.info "Attempting to initialize tls"
- start_tls
- else
- process_authentication
- end
end
- def ssl_handshake_completed
- logger.info "tls handshake completed"
- process_authentication if should_use_tls?
+ def post_connect
+ logger.info "Connection started; processing authentication"
+ process_authentication
end
-
def unbind
if self.stopping
logger.info "Stopping distributed client"
else
logger.info "Lost connection to distributed client - Scheduling reconnect"
@@ -89,10 +92,19 @@
logger.info "Attempting to authenticate..."
send_message(:authenticate, {:token => configuration.token})
end
end
+ def handle_welcome(options = {})
+ if should_use_ssl? && !ssl_enabled?
+ request_ssl!
+ else
+ @connected = true
+ post_connect
+ end
+ end
+
def handle_event(options = {})
event = options["event-name"]
client_host = options["client-host"]
client_nick = options["client-nick"]
options = options["event-options"]
@@ -140,37 +152,12 @@
c.connection_host = host
c.connection_port = port
end
end
- protected
-
- def options_for_callback(blk)
- return {} if blk.blank?
- cb_id = "callback-#{seld.object_id}-#{Time.now.to_f}"
- count = 0
- count += 1 while @callbacks.has_key?(Digest::SHA256.hexdigest("#{cb_id}-#{count}"))
- final_id = Digest::SHA256.hexdigest("#{cb_id}-#{count}")
- @callbacks[final_id] = blk
- {"callback-id" => final_id}
- end
-
- def process_callback(hash)
- if hash.is_a?(Hash) && hash.has_key?("callback-id")
- callback = @callbacks.delete(hash["callback-id"])
- callback.call(self, hash)
- end
- end
-
- def host_with_port
- @host_with_port ||= begin
- port, ip = Socket.unpack_sockaddr_in(get_peername)
- "#{ip}:#{port}"
- end
- end
-
- def should_use_tls?
- @using_tls ||= configuration.encrypted?
+ def request_ssl!
+ logger.info "Requesting SSL for Distributed Client"
+ send_message(:enable_ssl) unless ssl_enabled?
end
end
def initialize(em_connection)
\ No newline at end of file