lib/rflow/connection.rb in rflow-1.3.0 vs lib/rflow/connection.rb in rflow-1.3.1

- old
+ new

@@ -1,10 +1,13 @@ require 'rflow/message' class RFlow + # Represents an RFlow connection from one component to another. class Connection class << self + # Build an appropriate subclass of {Connection} based on the configuration. + # @return [Connection] def build(config) case config.type when 'RFlow::Configuration::ZMQConnection' RFlow::Connections::ZMQConnection.new(config) when 'RFlow::Configuration::BrokeredZMQConnection' @@ -13,11 +16,26 @@ raise ArgumentError, 'Only ZMQConnections currently supported' end end end - attr_accessor :config, :uuid, :name, :options + # The reference to the connection's configuration. + # @return [Configuration::Connection] + attr_accessor :config + + # The connection's UUID. + # @return [String] + attr_accessor :uuid + + # The connection's name. + # @return [String] + attr_accessor :name + + # The connection's options Hash. + # @return [Hash] + attr_accessor :options + attr_writer :recv_callback protected attr_reader :recv_callback public @@ -26,55 +44,68 @@ @uuid = config.uuid @name = config.name @options = config.options end - # Subclass and implement to be able to handle future 'recv' + # Subclass and implement to be able to handle future +recv+ # methods. Will only be called in the context of a running - # EventMachine reactor + # EventMachine reactor. + # @return [void] def connect_input! raise NotImplementedError, 'Raw connections do not support connect_input. Please subclass and define a connect_input method.' end - # Subclass and implement to be able to handle future 'send' + # Subclass and implement to be able to handle future +send+ # methods. Will only be called in the context of a running - # EventMachine reactor + # EventMachine reactor. + # @return [void] def connect_output! raise NotImplementedError, 'Raw connections do not support connect_output. Please subclass and define a connect_output method.' end # Subclass and implement to handle outgoing messages. The message - # will be a RFlow::Message object and the subclasses are expected + # will be a {RFlow::Message} object and the subclasses are expected # to marshal it up into something that will be unmarshalled on the - # other side + # other side. + # @return [void] def send_message(message) raise NotImplementedError, 'Raw connections do not support send_message. Please subclass and define a send_message method.' end # Parent component will set this attribute if it expects to - # recieve messages. Connection subclass should call it - # (recv_callback.call(message)) when it gets a new message, which + # receive messages. {Connection} subclass should call it + # (<tt>recv_callback.call(message)</tt>) when it gets a new message, which # will be transmitted back to the parent component's - # process_message method. Sublcass is responsible for - # deserializing whatever was on the wire into a RFlow::Message object + # {Component#process_message} method. Subclass is responsible for + # deserializing whatever was on the wire into a {RFlow::Message} object. + # @return [Proc] def recv_callback @recv_callback ||= Proc.new {|message|} end + # If we are connected to an {Component::InputPort} subport, the key for that subport. + # @return [String] def input_port_key; config.input_port_key; end + + # If we are connected to an {Component::OutputPort} subport, the key for that subport. + # @return [String] def output_port_key; config.output_port_key; end end # Primarily for testing purposes. Captures whatever messages are sent on it. class MessageCollectingConnection < Connection + # The messages that were collected. + # @return [Array<RFlow::Message>] attr_accessor :messages def initialize super(RFlow::Configuration::NullConnectionConfiguration.new) @messages = [] end + # Override of {send_message} which adds the message to {messages}. + # @return [void] def send_message(message) @messages << message end end @@ -86,10 +117,12 @@ def initialize(target_port) super(RFlow::Configuration::NullConnectionConfiguration.new) @target_port = target_port end + # Override of {send_message} which forwards the message to the target port. + # @return [void] def send_message(message) @target_port.send_message(message) end end @@ -102,9 +135,11 @@ super(RFlow::Configuration::NullConnectionConfiguration.new) @receiver = target_port.component @target_port = target_port end + # Override of {send_message} which forwards the message to the target port. + # @return [void] def send_message(message) @receiver.process_message(@target_port, nil, self, message) end end end