lib/rflow/component/port.rb in rflow-1.3.0 vs lib/rflow/component/port.rb in rflow-1.3.1
- old
+ new
@@ -1,70 +1,106 @@
class RFlow
class Component
# TODO: make this into a class to limit the amount of extensions
# that we have to do when operating on these 'Arrays', i.e. when
# adding two together
+ # @!visibility private
module ConnectionCollection
+ # @!visibility private
def send_message(message)
each {|connection| connection.send_message(message) }
end
end
# Collection class to make it easier to index by both names
# and types.
class PortCollection
- attr_reader :ports, :by_name, :by_type
+ # All the ports in the collection.
+ # @return [Array<Port>]
+ attr_reader :ports
+ # All the ports in the collection, indexed by name.
+ # @return [Hash<String, Port>]
+ attr_reader :by_name
+ # All the ports in the collection, indexed by type ({InputPort}, {OutputPort}).
+ # @return [Hash<String, Array<Port>>]
+ attr_reader :by_type
def initialize
@ports = []
@by_name = {}
@by_type = Hash.new {|hash, key| hash[key.to_s] = []}
end
+ # Add a port to the collection.
+ # @param port [Port] port to add
+ # @return [PortCollection] self
def <<(port)
by_name[port.name.to_s] = port
by_type[port.class.to_s] << port
ports << port
self
end
- # Enumerate through each port
+ # Enumerate through each port, +yield+ing each.
# TODO: simplify with enumerators and procs
+ # @return [Array<Port>]
def each
ports.each {|port| yield port }
end
end
+ # An input or output port on a {Component}.
class Port
- attr_reader :connected, :component
+ # True if there are connections to the port.
+ # @return [boolean]
+ attr_reader :connected
+ # The {Component} this port belongs to.
+ # @return [Component]
+ attr_reader :component
def initialize(component)
@component = component
end
+ # Synonym for {connected}.
+ # @return [boolean]
def connected?; connected; end
end
- # Stateless class to help with a nicer API
+ # Represents a keyed subport on a {Component} - that is, an input or output port
+ # that has been subscripted with a port name for subdividing the messages being
+ # received or output.
class HashSubPort
+ # @param hash_port [HashPort] the port to which this subport belongs
+ # @param key [String] the key subscript
def initialize(hash_port, key)
@hash_port = hash_port
@key = key
end
+ # Send a {Message} down all the connections to this subport.
+ # @param message [Message]
+ # @return [void]
def send_message(message)
connections.each {|connection| connection.send_message(message) }
end
+ # Retrieve all the connections for this subport.
+ # @return [Array<Connection>]
def connections
@hash_port.connections_for(@key)
end
+ # Directly connect this subport to another port.
+ # @param other_port [Port] the other port to connect to
+ # @return [void]
def direct_connect(other_port)
@hash_port.direct_connect(@key, other_port)
end
+ # Enumerate the connections to this subport, +yield+ing each.
+ # @return [Array<Connection>]
def each
connections.each {|connection| yield connection }
end
end
@@ -73,53 +109,76 @@
# output port will result in messages from all indexed connections
# being received. Similarly, sending to an unindexed port will
# result in the same message being sent to all indexed
# connections.
class HashPort < Port
- attr_accessor :name, :uuid
+ # The name of the port.
+ # @return [String]
+ attr_accessor :name
+ # The UUID of the port.
+ # @return [String]
+ attr_accessor :uuid
public
+ # @param component [Component] the component the port belongs to
+ # @param args [Hash] supported args are +:uuid+ and +:name+
def initialize(component, args = {})
super(component)
self.uuid = args[:uuid]
self.name = args[:name]
@connections_for = Hash.new {|hash, key| hash[key] = []}
end
# Get the subport for a given key, which can be used to send messages
- # or direct connection
+ # or direct connection.
+ # @param key [String] the key to subscript with
+ # @return [HashSubPort]
def [](key)
HashSubPort.new(self, key)
end
- # Returns an Array of all the connections that should
- # be sent/received on this subport. Merges the nil-keyed port
+ # Returns all the connections that should
+ # be sent/received on this subport. Merges the +nil+-keyed port
# (i.e. any connections for a port without a key) to those
# specific for the key, so should only be used to read a list of
- # connections, not to add new ones. Use add_connection to add a
+ # connections, not to add new ones. Use {add_connection} to add a
# new connection for a given key.
+ # @param key [String] the key to subscript with
+ # @return [Array<Connection>]
def connections_for(key)
case key
when nil; @connections_for[nil]
else @connections_for[key] + @connections_for[nil]
end
end
- # Adds a connection for a given key
+ # Adds a connection for a given key.
+ # @param key [String] the key to subscript with
+ # @param connection [Connection] the connection to add
+ # @return [void]
def add_connection(key, connection)
RFlow.logger.debug "Attaching #{connection.class.name} connection '#{connection.name}' (#{connection.uuid}) to port '#{name}' (#{uuid}), key '#{connection.input_port_key}'"
@connections_for[key] << connection
@all_connections = nil
end
- # Removes a connection from a given key
+ # Removes a connection from a given key.
+ # @param key [String] the key to subscript with
+ # @param connection [Connection] the connection to remove
+ # @return [void]
def remove_connection(key, connection)
RFlow.logger.debug "Removing #{connection.class.name} connection '#{connection.name}' (#{connection.uuid}) from port '#{name}' (#{uuid}), key '#{connection.input_port_key}'"
@connections_for[key].delete(connection)
@all_connections = nil
end
+ # Collect messages being sent to this port in a {MessageCollectingConnection}
+ # for retrieval later, usually for unit testing purposes. +yield+s after
+ # establishing the new connection.
+ # @param key [String] the key to subscript with
+ # @param receiver [Array] array in which to place arriving messages
+ # @return [MessageCollectingConnection]
def collect_messages(key, receiver)
begin
connection = RFlow::MessageCollectingConnection.new.tap do |c|
c.messages = receiver
end
@@ -130,51 +189,80 @@
ensure
remove_connection key, connection if connection && block_given?
end
end
+ # Directly connect this port to another port. If it's an input port,
+ # forward messages to that input port; if it's an output port,
+ # forward messages so they appear to come from that output port.
+ # @param key [String] the key to subscript with
+ # @param other_port [Port] the port to forward to
+ # @return [void]
def direct_connect(key = nil, other_port)
case other_port
when InputPort; add_connection key, ForwardToInputPort.new(other_port)
when OutputPort; add_connection key, ForwardToOutputPort.new(other_port)
else raise ArgumentError, "Unknown port type #{other_port.class.name}"
end
end
- # Return a list of connected keys
+ # A list of connected keys.
+ # @return [Array<String>]
def keys
@connections_for.keys
end
+ # Enumerate all connections, +yield+ing each.
+ # @return [Array<Connection>]
def each
@connections_for.values.each {|connections| yield connections }
end
+ # Override in subclasses to actually send messages places.
+ # @param message [Message] the message to send
+ # @return [void]
def send_message(message)
raise NotImplementedError, 'Raw ports do not know how to send messages'
end
- # Should be overridden. Called when it is time to actually
- # establish the connection
+ # Override in subclasses to handle establishing the connection.
+ # @return [void]
def connect!; raise NotImplementedError, 'Raw ports do not know which direction to connect'; end
+ # Retrieve all connections to the port, regardless of key. The resulting +Array+
+ # also supports +send_message(message)+ which will deliver the message on all
+ # connections.
+ # @return [Array<Connection>]
def all_connections
@all_connections ||= @connections_for.values.flatten.uniq.extend(ConnectionCollection)
end
end
+ # An actual {Component} input port.
class InputPort < HashPort
+ # Connect all the input connections, once everything's been set up.
+ # @return [void]
def connect!
@connections_for.each {|key, conns| conns.each {|c| c.connect_input! } }
@connected = true
end
+ # Add and start up a new {Connection}.
+ # @param key [String] the key to subscript with
+ # @param connection [Connection] the connection to add
+ # @return [void]
def add_connection(key, connection)
super
connection.connect_input! if connected?
end
+ # Once things have been set up, registering the receive callback
+ # will set it on all connections, so that when messages are received,
+ # they are delivered on all connections with appropriate key and connection
+ # information from the context of the connection.
+ # @param callback [Proc] the receive callback
+ # @return [void]
def recv_callback=(callback)
@connections_for.each do |key, connections|
connections.each do |connection|
connection.recv_callback = Proc.new do |message|
callback.call self, key, connection, message
@@ -182,22 +270,31 @@
end
end
end
end
+ # An actual {Component} output port.
class OutputPort < HashPort
+ # Connect all the output connections, once everything's been set up.
+ # @return [void]
def connect!
@connections_for.each {|key, conns| conns.each {|c| c.connect_output! } }
@connected = true
end
+ # Add and start up a new {Connection}.
+ # @param key [String] the key to subscript with
+ # @param connection [Connection] the connection to add
+ # @return [void]
def add_connection(key, connection)
super
connection.connect_output! if connected?
end
# Send a message to all connections on all keys for this port,
# but only once per connection.
+ # @param message [RFlow::Message] the message to send
+ # @return [void]
def send_message(message)
all_connections.send_message(message)
end
end
end