lib/faye/engines/memory.rb in faye-0.7.2 vs lib/faye/engines/memory.rb in faye-0.8.0

- old
+ new

@@ -1,25 +1,30 @@ module Faye module Engine - class Memory < Base + class Memory include Timeouts - def initialize(options) + def self.create(server, options) + new(server, options) + end + + def initialize(server, options) + @server = server + @options = options @namespace = Namespace.new @clients = {} @channels = {} @messages = {} - super end def create_client(&callback) client_id = @namespace.generate - debug 'Created new client ?', client_id + @server.debug 'Created new client ?', client_id ping(client_id) + @server.trigger(:handshake, client_id) callback.call(client_id) - trigger(:handshake, client_id) end def destroy_client(client_id, &callback) return unless @namespace.exists?(client_id) @@ -28,35 +33,36 @@ end remove_timeout(client_id) @namespace.release(client_id) @messages.delete(client_id) - debug 'Destroyed client ?', client_id + @server.debug 'Destroyed client ?', client_id + @server.trigger(:disconnect, client_id) callback.call if callback - trigger(:disconnect, client_id) end def client_exists(client_id, &callback) callback.call(@namespace.exists?(client_id)) end def ping(client_id) - return unless Numeric === @timeout - debug 'Ping ?, ?', client_id, @timeout + timeout = @server.timeout + return unless Numeric === timeout + @server.debug 'Ping ?, ?', client_id, timeout remove_timeout(client_id) - add_timeout(client_id, 2 * @timeout) { destroy_client(client_id) } + add_timeout(client_id, 2 * timeout) { destroy_client(client_id) } end def subscribe(client_id, channel, &callback) @clients[client_id] ||= Set.new should_trigger = @clients[client_id].add?(channel) @channels[channel] ||= Set.new @channels[channel].add(client_id) - debug 'Subscribed client ? to channel ?', client_id, channel - trigger(:subscribe, client_id, channel) if should_trigger + @server.debug 'Subscribed client ? to channel ?', client_id, channel + @server.trigger(:subscribe, client_id, channel) if should_trigger callback.call(true) if callback end def unsubscribe(client_id, channel, &callback) if @clients.has_key?(client_id) @@ -67,46 +73,40 @@ if @channels.has_key?(channel) @channels[channel].delete(client_id) @channels.delete(channel) if @channels[channel].empty? end - debug 'Unsubscribed client ? from channel ?', client_id, channel - trigger(:unsubscribe, client_id, channel) if should_trigger + @server.debug 'Unsubscribed client ? from channel ?', client_id, channel + @server.trigger(:unsubscribe, client_id, channel) if should_trigger callback.call(true) if callback end - def publish(message) - debug 'Publishing message ?', message + def publish(message, channels) + @server.debug 'Publishing message ?', message - channels = Channel.expand(message['channel']) - clients = Set.new + clients = Set.new channels.each do |channel| next unless subs = @channels[channel] subs.each(&clients.method(:add)) end clients.each do |client_id| - debug 'Queueing for client ?: ?', client_id, message + @server.debug 'Queueing for client ?: ?', client_id, message @messages[client_id] ||= [] @messages[client_id] << Faye.copy_object(message) empty_queue(client_id) end - trigger(:publish, message['clientId'], message['channel'], message['data']) + @server.trigger(:publish, message['clientId'], message['channel'], message['data']) end - private - def empty_queue(client_id) - return unless conn = connection(client_id, false) and - messages = @messages.delete(client_id) - - messages.each(&conn.method(:deliver)) + return unless @server.has_connection?(client_id) + @server.deliver(client_id, @messages[client_id]) + @messages.delete(client_id) end end - - register 'memory', Memory end end