lib/rflow/connection.rb in rflow-1.0.0a1 vs lib/rflow/connection.rb in rflow-1.0.0a2

- old
+ new

@@ -1,10 +1,9 @@ require 'rflow/message' class RFlow class Connection - class << self def build(config) case config.type when 'RFlow::Configuration::ZMQConnection' RFlow::Connections::ZMQConnection.new(config) @@ -13,36 +12,35 @@ end end end attr_accessor :config, :uuid, :name, :options - attr_accessor :recv_callback + attr_writer :recv_callback + protected + attr_reader :recv_callback def initialize(config) @config = config @uuid = config.uuid @name = config.name @options = config.options end - # Subclass and implement to be able to handle future 'recv' # methods. Will only be called in the context of a running # EventMachine reactor def connect_input! - raise NotImplementedError, "Raw connections do not support connect_input. Please subclass and define a connect_output method." + raise NotImplementedError, "Raw connections do not support connect_input. Please subclass and define a connect_input method." end - # Subclass and implement to be able to handle future 'send' # methods. Will only be called in the context of a running # EventMachine reactor def connect_output! raise NotImplementedError, "Raw connections do not support connect_output. Please subclass and define a connect_output method." end - # Subclass and implement to handle outgoing messages. The message # will be a RFlow::Message object and the subclasses are expected # to marshal it up into something that will be unmarshalled on the # other side def send_message(message) @@ -56,15 +54,54 @@ # process_message method. Sublcass is responsible for # deserializing whatever was on the wire into a RFlow::Message object def recv_callback @recv_callback ||= Proc.new {|message|} end + end - end # class Connection + # Primarily for testing purposes. Captures whatever messages are sent on it. + class MessageCollectingConnection < Connection + attr_accessor :messages - class Disconnection < Connection + def initialize + super(RFlow::Configuration::NullConfiguration.new) + @messages = [] + end + def send_message(message) - RFlow.logger.debug "Attempting to send without a connection, doing nothing" + @messages << message end end -end # class RFlow + # Manually shuffle messages in-process from one output port to another output + # port. Can be used to get a Facade pattern effect - to have one component + # contain other components within it, shuttling messages in and out without + # making the internal component visible to the larger RFlow network. + class ForwardToOutputPort < Connection + def initialize(receiver, port_name) + super(RFlow::Configuration::NullConfiguration.new) + @receiver = receiver + @port_name = port_name.to_sym + end + + def send_message(message) + @receiver.send(@port_name).send_message(message) + end + end + + # Manually shuffle messages in-process from one output port to another output + # port. Can be used to get a Facade pattern effect - to have one component + # contain other components within it, shuttling messages in and out without + # making the internal component visible to the larger RFlow network. + class ForwardToInputPort < Connection + def initialize(receiver, port_name, port_key) + super(RFlow::Configuration::NullConfiguration.new) + @receiver = receiver + @port_name = port_name.to_sym + @port_key = port_key + end + + def send_message(message) + @receiver.process_message(@receiver.send(@port_name), @port_key, self, message) + end + end +end