lib/rflow/connection.rb in rflow-1.0.0a2 vs lib/rflow/connection.rb in rflow-1.0.0a3

- old
+ new

@@ -5,10 +5,12 @@ class << self def build(config) case config.type when 'RFlow::Configuration::ZMQConnection' RFlow::Connections::ZMQConnection.new(config) + when 'RFlow::Configuration::BrokeredZMQConnection' + RFlow::Connections::BrokeredZMQConnection.new(config) else raise ArgumentError, "Only ZMQConnections currently supported" end end end @@ -16,10 +18,11 @@ attr_accessor :config, :uuid, :name, :options attr_writer :recv_callback protected attr_reader :recv_callback + public def initialize(config) @config = config @uuid = config.uuid @name = config.name @options = config.options @@ -54,10 +57,13 @@ # process_message method. Sublcass is responsible for # deserializing whatever was on the wire into a RFlow::Message object def recv_callback @recv_callback ||= Proc.new {|message|} end + + def input_port_key; config.input_port_key; end + def output_port_key; config.output_port_key; end end # Primarily for testing purposes. Captures whatever messages are sent on it. class MessageCollectingConnection < Connection attr_accessor :messages @@ -75,33 +81,31 @@ # Manually shuffle messages in-process from one output port to another output # port. Can be used to get a Facade pattern effect - to have one component # contain other components within it, shuttling messages in and out without # making the internal component visible to the larger RFlow network. class ForwardToOutputPort < Connection - def initialize(receiver, port_name) + def initialize(target_port) super(RFlow::Configuration::NullConfiguration.new) - @receiver = receiver - @port_name = port_name.to_sym + @target_port = target_port end def send_message(message) - @receiver.send(@port_name).send_message(message) + @target_port.send_message(message) end end # Manually shuffle messages in-process from one output port to another output # port. Can be used to get a Facade pattern effect - to have one component # contain other components within it, shuttling messages in and out without # making the internal component visible to the larger RFlow network. class ForwardToInputPort < Connection - def initialize(receiver, port_name, port_key) + def initialize(target_port) super(RFlow::Configuration::NullConfiguration.new) - @receiver = receiver - @port_name = port_name.to_sym - @port_key = port_key + @receiver = target_port.component + @target_port = target_port end def send_message(message) - @receiver.process_message(@receiver.send(@port_name), @port_key, self, message) + @receiver.process_message(@target_port, nil, self, message) end end end