lib/ably/realtime/connection/websocket_transport.rb in ably-0.1.6 vs lib/ably/realtime/connection/websocket_transport.rb in ably-0.2.0
- old
+ new
@@ -2,11 +2,10 @@
class Connection
# EventMachine WebSocket transport
# @api private
class WebsocketTransport < EventMachine::Connection
include Ably::Modules::EventEmitter
- include Ably::Modules::Conversions
extend Ably::Modules::Enum
# Valid WebSocket connection states
STATE = ruby_enum('STATE',
:initialized,
@@ -15,32 +14,21 @@
:disconnecting,
:disconnected
)
include Ably::Modules::StateEmitter
- def initialize(connection)
+ def initialize(connection, url)
@connection = connection
@state = STATE.Initialized
- end
+ @url = url
- # Send object down the WebSocket driver connection as a serialized string/byte array based on protocol
- # @param [Object] object to serialize and send to the WebSocket driver
- # @api public
- def send_object(object)
- case client.protocol
- when :json
- driver.text(object.to_json)
- when :msgpack
- driver.binary(object.to_msgpack.unpack('c*'))
- else
- client.logger.error "Unsupported protocol '#{client.protocol}' for serialization, object cannot be serialized and sent to Ably over this WebSocket"
- end
+ setup_event_handlers
end
# Disconnect the socket transport connection and write all pending text.
# If Disconnected state is not automatically triggered, it will be triggered automatically
- # @return <void>
+ # @return [void]
# @api public
def disconnect
close_connection_after_writing
change_state STATE.Disconnecting
create_timer(2) do
@@ -79,17 +67,11 @@
end
# URL end point including initialization configuration
# {http://www.rubydoc.info/gems/websocket-driver/0.3.5/WebSocket/Driver WebSocket::Driver} interface
def url
- URI(client.endpoint).tap do |endpoint|
- endpoint.query = URI.encode_www_form(client.auth.auth_params.merge(
- timestamp: as_since_epoch(Time.now),
- format: client.protocol,
- echo: client.echo_messages
- ))
- end.to_s
+ @url
end
# {http://www.rubydoc.info/gems/websocket-driver/0.3.5/WebSocket/Driver WebSocket::Driver} interface
def write(data)
send_data(data)
@@ -99,13 +81,48 @@
# i.e. it is not currently connecting or connected
def ready_for_release?
!connecting? && !connected?
end
+ # @!attribute [r] __incoming_protocol_msgbus__
+ # @return [Ably::Util::PubSub] Websocket Transport internal incoming protocol message bus
+ # @api private
+ def __incoming_protocol_msgbus__
+ @__incoming_protocol_msgbus__ ||= create_pub_sub_message_bus
+ end
+
+ # @!attribute [r] __outgoing_protocol_msgbus__
+ # @return [Ably::Util::PubSub] Websocket Transport internal outgoing protocol message bus
+ # @api private
+ def __outgoing_protocol_msgbus__
+ @__outgoing_protocol_msgbus__ ||= create_pub_sub_message_bus
+ end
+
private
attr_reader :connection, :driver
+ # Send object down the WebSocket driver connection as a serialized string/byte array based on protocol
+ # @param [Object] object to serialize and send to the WebSocket driver
+ # @api public
+ def send_object(object)
+ case client.protocol
+ when :json
+ driver.text(object.to_json)
+ when :msgpack
+ driver.binary(object.to_msgpack.unpack('C*'))
+ else
+ client.logger.fatal "WebsocketTransport: Unsupported protocol '#{client.protocol}' for serialization, object cannot be serialized and sent to Ably over this WebSocket"
+ end
+ end
+
+ def setup_event_handlers
+ __outgoing_protocol_msgbus__.subscribe(:protocol_message) do |protocol_message|
+ send_object protocol_message
+ client.logger.debug "WebsocketTransport: Prot msg sent =>: #{protocol_message.action} #{protocol_message}"
+ end
+ end
+
def clear_timer
if @timer
@timer.cancel
@timer = nil
end
@@ -119,21 +136,22 @@
def setup_driver
@driver = WebSocket::Driver.client(self)
driver.on("open") do
- logger.debug "WebSocket connection opened to #{url}, waiting for Connected protocol message"
+ logger.debug "WebsocketTransport: socket opened to #{url}, waiting for Connected protocol message"
end
driver.on("message") do |event|
event_data = parse_event_data(event.data).freeze
protocol_message = Ably::Models::ProtocolMessage.new(event_data)
- logger.debug "Prot msg recv <=: #{protocol_message.action} #{event_data}"
+ logger.debug "WebsocketTransport: Prot msg recv <=: #{protocol_message.action} #{event_data}"
+
if protocol_message.invalid?
- logger.error "Invalid Protocol Message received: #{event_data}\nNo action taken"
+ logger.fatal "WebsocketTransport: Invalid Protocol Message received: #{event_data}\nNo action taken"
else
- connection.__incoming_protocol_msgbus__.publish :message, protocol_message
+ __incoming_protocol_msgbus__.publish :protocol_message, protocol_message
end
end
end
def client
@@ -148,14 +166,23 @@
def parse_event_data(data)
case client.protocol
when :json
JSON.parse(data)
when :msgpack
- MessagePack.unpack(data.pack('c*'))
+ MessagePack.unpack(data.pack('C*'))
else
- client.logger.error "Unsupported Protocol Message format #{client.protocol}"
+ client.logger.fatal "WebsocketTransport: Unsupported Protocol Message format #{client.protocol}"
data
end
+ end
+
+ def create_pub_sub_message_bus
+ Ably::Util::PubSub.new(
+ coerce_into: Proc.new do |event|
+ raise KeyError, "Expected :protocol_message, :#{event} is disallowed" unless event == :protocol_message
+ :protocol_message
+ end
+ )
end
end
end
end