lib/rflow/configuration/connection.rb in rflow-1.3.0 vs lib/rflow/configuration/connection.rb in rflow-1.3.1

- old
+ new

@@ -1,60 +1,81 @@ require 'active_record' require 'rflow/configuration/uuid_keyed' class RFlow class Configuration + # Represents a component-to-component connection in the SQLite database. class Connection < ConfigurationItem + # Exception for when the connection is invalid. class ConnectionInvalid < StandardError; end include UUIDKeyed include ActiveModel::Validations + # @!attribute options + # Open-ended Hash of component options, serialized via YAML to a single column. + # @return [Hash] serialize :options, Hash + # @!attribute input_port + # The {InputPort} to which this {Connection} delivers messages. + # @return [InputPort] belongs_to :input_port, :primary_key => 'uuid', :foreign_key => 'input_port_uuid' + + # @!attribute output_port + # The {OutputPort} from which this {Connection} receives messages. + # @return [OutputPort] belongs_to :output_port,:primary_key => 'uuid', :foreign_key => 'output_port_uuid' before_create :merge_default_options! validates_uniqueness_of :uuid validates_presence_of :output_port_uuid, :input_port_uuid validate :all_required_options_present? + # @!visibility private def all_required_options_present? self.class.required_options.each do |option_name| unless self.options.include? option_name.to_s errors.add(:options, "must include #{option_name} for #{self.class.to_s}") end end end + # @!visibility private def merge_default_options! self.options ||= {} self.class.default_options.each do |name, default_value_or_proc| self.options[name.to_s] ||= default_value_or_proc.is_a?(Proc) ? default_value_or_proc.call(self) : default_value_or_proc end end # Should return a list of require option names which will be - # used in validations. To be overridden. + # used in validations. To be overridden by subclasses. + # @return [Array<String>] def self.required_options; []; end # Should return a hash of default options, where the keys are # the option names and the values are either default option # values or Procs that take a single connection argument. This # allow defaults to use other parameters in the connection to - # construct the appropriate default value. + # construct the appropriate default value. To be overridden + # by subclasses. + # @return [Hash] def self.default_options; {}; end # By default, no broker processes are required to manage a connection. + # To be overridden by subclasses. + # @return [Array<Broker>] def brokers; []; end end - # STI Subclass for ZMQ connections and their required options + # Subclass of {Connection} for ZMQ connections and their required options. class ZMQConnection < Connection + # Default options required for ZeroMQ connection. + # @return [Hash] def self.default_options { 'output_socket_type' => 'PUSH', 'output_address' => lambda{|conn| "ipc://rflow.#{conn.uuid}"}, 'output_responsibility' => 'connect', @@ -63,19 +84,20 @@ 'input_responsibility' => 'bind', } end end - # STI Subclass for brokered ZMQ connections and their required options + # Subclass of {Connection} for brokered ZMQ connections and their required options. # # We name the IPCs to resemble a quasi-component. Outputting to this - # connection goes to the 'in' of the IPC pair. Reading input from this - # connection comes from the 'out' of the IPC pair. + # connection goes to the +in+ of the IPC pair. Reading input from this + # connection comes from the +out+ of the IPC pair. # # The broker shuttles messages between the two to support the many-to-many # delivery pattern. class BrokeredZMQConnection < Connection + # Default ZeroMQ options required for broker connection. def self.default_options { 'output_socket_type' => 'PUSH', 'output_address' => lambda{|conn| "ipc://rflow.#{conn.uuid}.in"}, 'output_responsibility' => 'connect', @@ -84,27 +106,36 @@ 'input_responsibility' => 'connect', } end # A brokered ZMQ connection requires one broker process. + # @return [Array<Broker>] def brokers @brokers ||= [ZMQStreamer.new(self)] end end # Represents the broker process configuration. No special parameters # that can't be derived from the connection. Not persisted in the database - # it's encapsulated in the nature of the connection. class ZMQStreamer + # Backreference to the {Connection}. + # @return [Connection] attr_reader :connection def initialize(connection) @connection = connection end end - # for testing purposes + # For testing purposes only. + # @!visibility private class NullConnectionConfiguration - attr_accessor :name, :uuid, :options, :input_port_key, :output_port_key, :delivery + attr_accessor :name + attr_accessor :uuid + attr_accessor :options + attr_accessor :input_port_key + attr_accessor :output_port_key + attr_accessor :delivery end end end