lib/rflow/connection.rb in rflow-1.0.0a2 vs lib/rflow/connection.rb in rflow-1.0.0a3
- old
+ new
@@ -5,10 +5,12 @@
class << self
def build(config)
case config.type
when 'RFlow::Configuration::ZMQConnection'
RFlow::Connections::ZMQConnection.new(config)
+ when 'RFlow::Configuration::BrokeredZMQConnection'
+ RFlow::Connections::BrokeredZMQConnection.new(config)
else
raise ArgumentError, "Only ZMQConnections currently supported"
end
end
end
@@ -16,10 +18,11 @@
attr_accessor :config, :uuid, :name, :options
attr_writer :recv_callback
protected
attr_reader :recv_callback
+ public
def initialize(config)
@config = config
@uuid = config.uuid
@name = config.name
@options = config.options
@@ -54,10 +57,13 @@
# process_message method. Sublcass is responsible for
# deserializing whatever was on the wire into a RFlow::Message object
def recv_callback
@recv_callback ||= Proc.new {|message|}
end
+
+ def input_port_key; config.input_port_key; end
+ def output_port_key; config.output_port_key; end
end
# Primarily for testing purposes. Captures whatever messages are sent on it.
class MessageCollectingConnection < Connection
attr_accessor :messages
@@ -75,33 +81,31 @@
# Manually shuffle messages in-process from one output port to another output
# port. Can be used to get a Facade pattern effect - to have one component
# contain other components within it, shuttling messages in and out without
# making the internal component visible to the larger RFlow network.
class ForwardToOutputPort < Connection
- def initialize(receiver, port_name)
+ def initialize(target_port)
super(RFlow::Configuration::NullConfiguration.new)
- @receiver = receiver
- @port_name = port_name.to_sym
+ @target_port = target_port
end
def send_message(message)
- @receiver.send(@port_name).send_message(message)
+ @target_port.send_message(message)
end
end
# Manually shuffle messages in-process from one output port to another output
# port. Can be used to get a Facade pattern effect - to have one component
# contain other components within it, shuttling messages in and out without
# making the internal component visible to the larger RFlow network.
class ForwardToInputPort < Connection
- def initialize(receiver, port_name, port_key)
+ def initialize(target_port)
super(RFlow::Configuration::NullConfiguration.new)
- @receiver = receiver
- @port_name = port_name.to_sym
- @port_key = port_key
+ @receiver = target_port.component
+ @target_port = target_port
end
def send_message(message)
- @receiver.process_message(@receiver.send(@port_name), @port_key, self, message)
+ @receiver.process_message(@target_port, nil, self, message)
end
end
end