lib/rflow/configuration/connection.rb in rflow-1.0.0a2 vs lib/rflow/configuration/connection.rb in rflow-1.0.0a3

- old
+ new

@@ -44,10 +44,13 @@ # the option names and the values are either default option # values or Procs that take a single connection argument. This # allow defaults to use other parameters in the connection to # construct the appropriate default value. def self.default_options; {}; end + + # By default, no broker processes are required to manage a connection. + def brokers; []; end end # STI Subclass for ZMQ connections and their required options class ZMQConnection < Connection def self.default_options @@ -60,33 +63,48 @@ 'input_responsibility' => 'bind', } end end - # STI Subclass for AMQP connections and their required options - class AMQPConnection < Connection + # STI Subclass for brokered ZMQ connections and their required options + # + # We name the IPCs to resemble a quasi-component. Outputting to this + # connection goes to the 'in' of the IPC pair. Reading input from this + # connection comes from the 'out' of the IPC pair. + # + # The broker shuttles messages between the two to support the many-to-many + # delivery pattern. + class BrokeredZMQConnection < Connection def self.default_options { - 'host' => 'localhost', - 'port' => 5672, - 'insist' => true, - 'vhost' => '/', - 'username' => 'guest', - 'password' => 'guest', - - # If a queue is created, these are the default parameters - # for said queue type - 'queue_passive' => false, - 'queue_durable' => true, - 'queue_exclusive' => false, - 'queue_auto_delete' => false, - 'queue_nowait' => true, + 'output_socket_type' => 'PUSH', + 'output_address' => lambda{|conn| "ipc://rflow.#{conn.uuid}.in"}, + 'output_responsibility' => 'connect', + 'input_socket_type' => 'PULL', + 'input_address' => lambda{|conn| "ipc://rflow.#{conn.uuid}.out"}, + 'input_responsibility' => 'connect', } end + + # A brokered ZMQ connection requires one broker process. + def brokers + @brokers ||= [ZMQStreamer.new(self)] + end end + # Represents the broker process configuration. No special parameters + # that can't be derived from the connection. Not persisted in the database - + # it's encapsulated in the nature of the connection. + class ZMQStreamer + attr_reader :connection + + def initialize(connection) + @connection = connection + end + end + # for testing purposes class NullConfiguration - attr_accessor :name, :uuid, :options + attr_accessor :name, :uuid, :options, :input_port_key, :output_port_key end end end