lib/rflow/configuration/connection.rb in rflow-1.0.0a2 vs lib/rflow/configuration/connection.rb in rflow-1.0.0a3
- old
+ new
@@ -44,10 +44,13 @@
# the option names and the values are either default option
# values or Procs that take a single connection argument. This
# allow defaults to use other parameters in the connection to
# construct the appropriate default value.
def self.default_options; {}; end
+
+ # By default, no broker processes are required to manage a connection.
+ def brokers; []; end
end
# STI Subclass for ZMQ connections and their required options
class ZMQConnection < Connection
def self.default_options
@@ -60,33 +63,48 @@
'input_responsibility' => 'bind',
}
end
end
- # STI Subclass for AMQP connections and their required options
- class AMQPConnection < Connection
+ # STI Subclass for brokered ZMQ connections and their required options
+ #
+ # We name the IPCs to resemble a quasi-component. Outputting to this
+ # connection goes to the 'in' of the IPC pair. Reading input from this
+ # connection comes from the 'out' of the IPC pair.
+ #
+ # The broker shuttles messages between the two to support the many-to-many
+ # delivery pattern.
+ class BrokeredZMQConnection < Connection
def self.default_options
{
- 'host' => 'localhost',
- 'port' => 5672,
- 'insist' => true,
- 'vhost' => '/',
- 'username' => 'guest',
- 'password' => 'guest',
-
- # If a queue is created, these are the default parameters
- # for said queue type
- 'queue_passive' => false,
- 'queue_durable' => true,
- 'queue_exclusive' => false,
- 'queue_auto_delete' => false,
- 'queue_nowait' => true,
+ 'output_socket_type' => 'PUSH',
+ 'output_address' => lambda{|conn| "ipc://rflow.#{conn.uuid}.in"},
+ 'output_responsibility' => 'connect',
+ 'input_socket_type' => 'PULL',
+ 'input_address' => lambda{|conn| "ipc://rflow.#{conn.uuid}.out"},
+ 'input_responsibility' => 'connect',
}
end
+
+ # A brokered ZMQ connection requires one broker process.
+ def brokers
+ @brokers ||= [ZMQStreamer.new(self)]
+ end
end
+ # Represents the broker process configuration. No special parameters
+ # that can't be derived from the connection. Not persisted in the database -
+ # it's encapsulated in the nature of the connection.
+ class ZMQStreamer
+ attr_reader :connection
+
+ def initialize(connection)
+ @connection = connection
+ end
+ end
+
# for testing purposes
class NullConfiguration
- attr_accessor :name, :uuid, :options
+ attr_accessor :name, :uuid, :options, :input_port_key, :output_port_key
end
end
end