lib/rflow/connection.rb in rflow-1.3.0 vs lib/rflow/connection.rb in rflow-1.3.1
- old
+ new
@@ -1,10 +1,13 @@
require 'rflow/message'
class RFlow
+ # Represents an RFlow connection from one component to another.
class Connection
class << self
+ # Build an appropriate subclass of {Connection} based on the configuration.
+ # @return [Connection]
def build(config)
case config.type
when 'RFlow::Configuration::ZMQConnection'
RFlow::Connections::ZMQConnection.new(config)
when 'RFlow::Configuration::BrokeredZMQConnection'
@@ -13,11 +16,26 @@
raise ArgumentError, 'Only ZMQConnections currently supported'
end
end
end
- attr_accessor :config, :uuid, :name, :options
+ # The reference to the connection's configuration.
+ # @return [Configuration::Connection]
+ attr_accessor :config
+
+ # The connection's UUID.
+ # @return [String]
+ attr_accessor :uuid
+
+ # The connection's name.
+ # @return [String]
+ attr_accessor :name
+
+ # The connection's options Hash.
+ # @return [Hash]
+ attr_accessor :options
+
attr_writer :recv_callback
protected
attr_reader :recv_callback
public
@@ -26,55 +44,68 @@
@uuid = config.uuid
@name = config.name
@options = config.options
end
- # Subclass and implement to be able to handle future 'recv'
+ # Subclass and implement to be able to handle future +recv+
# methods. Will only be called in the context of a running
- # EventMachine reactor
+ # EventMachine reactor.
+ # @return [void]
def connect_input!
raise NotImplementedError, 'Raw connections do not support connect_input. Please subclass and define a connect_input method.'
end
- # Subclass and implement to be able to handle future 'send'
+ # Subclass and implement to be able to handle future +send+
# methods. Will only be called in the context of a running
- # EventMachine reactor
+ # EventMachine reactor.
+ # @return [void]
def connect_output!
raise NotImplementedError, 'Raw connections do not support connect_output. Please subclass and define a connect_output method.'
end
# Subclass and implement to handle outgoing messages. The message
- # will be a RFlow::Message object and the subclasses are expected
+ # will be a {RFlow::Message} object and the subclasses are expected
# to marshal it up into something that will be unmarshalled on the
- # other side
+ # other side.
+ # @return [void]
def send_message(message)
raise NotImplementedError, 'Raw connections do not support send_message. Please subclass and define a send_message method.'
end
# Parent component will set this attribute if it expects to
- # recieve messages. Connection subclass should call it
- # (recv_callback.call(message)) when it gets a new message, which
+ # receive messages. {Connection} subclass should call it
+ # (<tt>recv_callback.call(message)</tt>) when it gets a new message, which
# will be transmitted back to the parent component's
- # process_message method. Sublcass is responsible for
- # deserializing whatever was on the wire into a RFlow::Message object
+ # {Component#process_message} method. Subclass is responsible for
+ # deserializing whatever was on the wire into a {RFlow::Message} object.
+ # @return [Proc]
def recv_callback
@recv_callback ||= Proc.new {|message|}
end
+ # If we are connected to an {Component::InputPort} subport, the key for that subport.
+ # @return [String]
def input_port_key; config.input_port_key; end
+
+ # If we are connected to an {Component::OutputPort} subport, the key for that subport.
+ # @return [String]
def output_port_key; config.output_port_key; end
end
# Primarily for testing purposes. Captures whatever messages are sent on it.
class MessageCollectingConnection < Connection
+ # The messages that were collected.
+ # @return [Array<RFlow::Message>]
attr_accessor :messages
def initialize
super(RFlow::Configuration::NullConnectionConfiguration.new)
@messages = []
end
+ # Override of {send_message} which adds the message to {messages}.
+ # @return [void]
def send_message(message)
@messages << message
end
end
@@ -86,10 +117,12 @@
def initialize(target_port)
super(RFlow::Configuration::NullConnectionConfiguration.new)
@target_port = target_port
end
+ # Override of {send_message} which forwards the message to the target port.
+ # @return [void]
def send_message(message)
@target_port.send_message(message)
end
end
@@ -102,9 +135,11 @@
super(RFlow::Configuration::NullConnectionConfiguration.new)
@receiver = target_port.component
@target_port = target_port
end
+ # Override of {send_message} which forwards the message to the target port.
+ # @return [void]
def send_message(message)
@receiver.process_message(@target_port, nil, self, message)
end
end
end