lib/rflow/component/port.rb in rflow-1.0.1 vs lib/rflow/component/port.rb in rflow-1.1.0
- old
+ new
@@ -1,9 +1,9 @@
class RFlow
class Component
# TODO: make this into a class to limit the amount of extensions
- # that we have to do when operating on these "Arrays", i.e. when
+ # that we have to do when operating on these 'Arrays', i.e. when
# adding two together
module ConnectionCollection
def send_message(message)
each {|connection| connection.send_message(message) }
end
@@ -42,54 +42,81 @@
end
def connected?; connected; end
end
+ # Stateless class to help with a nicer API
+ class HashSubPort
+ def initialize(hash_port, key)
+ @hash_port = hash_port
+ @key = key
+ end
+
+ def send_message(message)
+ connections.each {|connection| connection.send_message(message) }
+ end
+
+ def connections
+ @hash_port.connections_for(@key)
+ end
+
+ def direct_connect(other_port)
+ @hash_port.direct_connect(@key, other_port)
+ end
+
+ def each
+ connections.each {|connection| yield connection }
+ end
+ end
+
# Allows for a list of connections to be assigned to each port/key
# combination. Note that binding an input port to an un-indexed
# output port will result in messages from all indexed connections
# being received. Similarly, sending to an unindexed port will
# result in the same message being sent to all indexed
# connections.
class HashPort < Port
attr_accessor :name, :uuid
- protected
- attr_reader :connections_for
-
public
def initialize(component, args = {})
super(component)
self.uuid = args[:uuid]
self.name = args[:name]
- @connections_for = Hash.new {|hash, key| hash[key] = [].extend(ConnectionCollection)}
+ @connections_for = Hash.new {|hash, key| hash[key] = []}
end
- # Returns an extended Array of all the connections that should
- # be sent/received on this port. Merges the nil-keyed port
+ # Get the subport for a given key, which can be used to send messages
+ # or direct connection
+ def [](key)
+ HashSubPort.new(self, key)
+ end
+
+ # Returns an Array of all the connections that should
+ # be sent/received on this subport. Merges the nil-keyed port
# (i.e. any connections for a port without a key) to those
# specific for the key, so should only be used to read a list of
# connections, not to add new ones. Use add_connection to add a
# new connection for a given key.
- def [](key)
+ def connections_for(key)
case key
- when nil; connections_for[nil]
- else connections_for[key] + connections_for[nil]
- end.extend(ConnectionCollection)
+ when nil; @connections_for[nil]
+ else @connections_for[key] + @connections_for[nil]
+ end
end
# Adds a connection for a given key
def add_connection(key, connection)
RFlow.logger.debug "Attaching #{connection.class.name} connection '#{connection.name}' (#{connection.uuid}) to port '#{name}' (#{uuid}), key '#{connection.input_port_key}'"
- connections_for[key] << connection
+ @connections_for[key] << connection
@all_connections = nil
end
# Removes a connection from a given key
def remove_connection(key, connection)
RFlow.logger.debug "Removing #{connection.class.name} connection '#{connection.name}' (#{connection.uuid}) from port '#{name}' (#{uuid}), key '#{connection.input_port_key}'"
- connections_for[key].delete(connection)
+ @connections_for[key].delete(connection)
@all_connections = nil
end
def collect_messages(key, receiver)
begin
@@ -103,55 +130,53 @@
ensure
remove_connection key, connection if connection && block_given?
end
end
- def direct_connect(other_port)
+ def direct_connect(key = nil, other_port)
case other_port
- when InputPort; add_connection nil, ForwardToInputPort.new(other_port)
- when OutputPort; add_connection nil, ForwardToOutputPort.new(other_port)
+ when InputPort; add_connection key, ForwardToInputPort.new(other_port)
+ when OutputPort; add_connection key, ForwardToOutputPort.new(other_port)
else raise ArgumentError, "Unknown port type #{other_port.class.name}"
end
end
# Return a list of connected keys
def keys
- connections_for.keys
+ @connections_for.keys
end
- # Enumerate through all the ConnectionCollections
- # TODO: simplify with enumerators and procs
def each
- connections_for.values.each {|connections| yield connections }
+ @connections_for.values.each {|connections| yield connections }
end
def send_message(message)
- def connect!; raise NotImplementedError, "Raw ports do not know how to send messages"; end
+ raise NotImplementedError, 'Raw ports do not know how to send messages'
end
# Should be overridden. Called when it is time to actually
# establish the connection
- def connect!; raise NotImplementedError, "Raw ports do not know which direction to connect"; end
+ def connect!; raise NotImplementedError, 'Raw ports do not know which direction to connect'; end
def all_connections
- @all_connections ||= connections_for.values.flatten.uniq.extend(ConnectionCollection)
+ @all_connections ||= @connections_for.values.flatten.uniq.extend(ConnectionCollection)
end
end
class InputPort < HashPort
def connect!
- connections_for.each {|key, conns| conns.each {|c| c.connect_input! } }
+ @connections_for.each {|key, conns| conns.each {|c| c.connect_input! } }
@connected = true
end
def add_connection(key, connection)
super
connection.connect_input! if connected?
end
def recv_callback=(callback)
- connections_for.each do |key, connections|
+ @connections_for.each do |key, connections|
connections.each do |connection|
connection.recv_callback = Proc.new do |message|
callback.call self, key, connection, message
end
end
@@ -159,10 +184,10 @@
end
end
class OutputPort < HashPort
def connect!
- connections_for.each {|key, conns| conns.each {|c| c.connect_output! } }
+ @connections_for.each {|key, conns| conns.each {|c| c.connect_output! } }
@connected = true
end
def add_connection(key, connection)
super