lib/ably/realtime/connection/websocket_transport.rb in ably-0.1.6 vs lib/ably/realtime/connection/websocket_transport.rb in ably-0.2.0

- old
+ new

@@ -2,11 +2,10 @@ class Connection # EventMachine WebSocket transport # @api private class WebsocketTransport < EventMachine::Connection include Ably::Modules::EventEmitter - include Ably::Modules::Conversions extend Ably::Modules::Enum # Valid WebSocket connection states STATE = ruby_enum('STATE', :initialized, @@ -15,32 +14,21 @@ :disconnecting, :disconnected ) include Ably::Modules::StateEmitter - def initialize(connection) + def initialize(connection, url) @connection = connection @state = STATE.Initialized - end + @url = url - # Send object down the WebSocket driver connection as a serialized string/byte array based on protocol - # @param [Object] object to serialize and send to the WebSocket driver - # @api public - def send_object(object) - case client.protocol - when :json - driver.text(object.to_json) - when :msgpack - driver.binary(object.to_msgpack.unpack('c*')) - else - client.logger.error "Unsupported protocol '#{client.protocol}' for serialization, object cannot be serialized and sent to Ably over this WebSocket" - end + setup_event_handlers end # Disconnect the socket transport connection and write all pending text. # If Disconnected state is not automatically triggered, it will be triggered automatically - # @return <void> + # @return [void] # @api public def disconnect close_connection_after_writing change_state STATE.Disconnecting create_timer(2) do @@ -79,17 +67,11 @@ end # URL end point including initialization configuration # {http://www.rubydoc.info/gems/websocket-driver/0.3.5/WebSocket/Driver WebSocket::Driver} interface def url - URI(client.endpoint).tap do |endpoint| - endpoint.query = URI.encode_www_form(client.auth.auth_params.merge( - timestamp: as_since_epoch(Time.now), - format: client.protocol, - echo: client.echo_messages - )) - end.to_s + @url end # {http://www.rubydoc.info/gems/websocket-driver/0.3.5/WebSocket/Driver WebSocket::Driver} interface def write(data) send_data(data) @@ -99,13 +81,48 @@ # i.e. it is not currently connecting or connected def ready_for_release? !connecting? && !connected? end + # @!attribute [r] __incoming_protocol_msgbus__ + # @return [Ably::Util::PubSub] Websocket Transport internal incoming protocol message bus + # @api private + def __incoming_protocol_msgbus__ + @__incoming_protocol_msgbus__ ||= create_pub_sub_message_bus + end + + # @!attribute [r] __outgoing_protocol_msgbus__ + # @return [Ably::Util::PubSub] Websocket Transport internal outgoing protocol message bus + # @api private + def __outgoing_protocol_msgbus__ + @__outgoing_protocol_msgbus__ ||= create_pub_sub_message_bus + end + private attr_reader :connection, :driver + # Send object down the WebSocket driver connection as a serialized string/byte array based on protocol + # @param [Object] object to serialize and send to the WebSocket driver + # @api public + def send_object(object) + case client.protocol + when :json + driver.text(object.to_json) + when :msgpack + driver.binary(object.to_msgpack.unpack('C*')) + else + client.logger.fatal "WebsocketTransport: Unsupported protocol '#{client.protocol}' for serialization, object cannot be serialized and sent to Ably over this WebSocket" + end + end + + def setup_event_handlers + __outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message| + send_object protocol_message + client.logger.debug "WebsocketTransport: Prot msg sent =>: #{protocol_message.action} #{protocol_message}" + end + end + def clear_timer if @timer @timer.cancel @timer = nil end @@ -119,21 +136,22 @@ def setup_driver @driver = WebSocket::Driver.client(self) driver.on("open") do - logger.debug "WebSocket connection opened to #{url}, waiting for Connected protocol message" + logger.debug "WebsocketTransport: socket opened to #{url}, waiting for Connected protocol message" end driver.on("message") do |event| event_data = parse_event_data(event.data).freeze protocol_message = Ably::Models::ProtocolMessage.new(event_data) - logger.debug "Prot msg recv <=: #{protocol_message.action} #{event_data}" + logger.debug "WebsocketTransport: Prot msg recv <=: #{protocol_message.action} #{event_data}" + if protocol_message.invalid? - logger.error "Invalid Protocol Message received: #{event_data}\nNo action taken" + logger.fatal "WebsocketTransport: Invalid Protocol Message received: #{event_data}\nNo action taken" else - connection.__incoming_protocol_msgbus__.publish :message, protocol_message + __incoming_protocol_msgbus__.publish :protocol_message, protocol_message end end end def client @@ -148,14 +166,23 @@ def parse_event_data(data) case client.protocol when :json JSON.parse(data) when :msgpack - MessagePack.unpack(data.pack('c*')) + MessagePack.unpack(data.pack('C*')) else - client.logger.error "Unsupported Protocol Message format #{client.protocol}" + client.logger.fatal "WebsocketTransport: Unsupported Protocol Message format #{client.protocol}" data end + end + + def create_pub_sub_message_bus + Ably::Util::PubSub.new( + coerce_into: Proc.new do |event| + raise KeyError, "Expected :protocol_message, :#{event} is disallowed" unless event == :protocol_message + :protocol_message + end + ) end end end end