lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a2 vs lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a3
- old
+ new
@@ -40,10 +40,20 @@
shard_specs << @current_shard
yield self
@current_shard = default_shard
end
+ # shortcut
+ def process(name, options = {}, &block)
+ shard(name, options.merge(:type => :process), &block)
+ end
+
+ # shortcut
+ def thread(name, options = {}, &block)
+ shard(name, options.merge(:type => :thread), &block)
+ end
+
# DSL method to specify a component. Expects a name,
# specification, and set of component specific options, that
# must be marshallable into the database (i.e. should all be strings)
def component(name, specification, options = {})
@current_shard[:components] << {
@@ -83,16 +93,16 @@
# Method called within the config file itself
def self.configure
config_file = self.new
yield config_file
- config_file.process
+ config_file.process_objects
end
# Method to process the 'DSL' objects into the config database
# via ActiveRecord
- def process
+ def process_objects
process_setting_specs
process_shard_specs
process_connection_specs
end
@@ -152,12 +162,12 @@
end
end
# For each given connection, break up each input/output
# component/port specification, ensure that the component
- # already exists in the database (by name). Also, only supports
- # ZeroMQ ipc sockets
+ # already exists in the database (by name). Chooses the best
+ # connection type for any pair of components.
def process_connection_specs
connection_specs.each do |spec|
begin
RFlow.logger.debug "Found connection from '#{spec[:output_string]}' to '#{spec[:input_string]}', creating"
@@ -173,14 +183,42 @@
raise RFlow::Configuration::Connection::ConnectionInvalid,
"Component '#{spec[:input_component_name]}' not found at #{spec[:config_line]}" unless input_component
input_port = input_component.input_ports.find_or_initialize_by_name :name => spec[:input_port_name]
input_port.save!
- RFlow::Configuration::ZMQConnection.create!(:name => spec[:name],
- :output_port_key => spec[:output_port_key],
- :input_port_key => spec[:input_port_key],
- :output_port => output_port,
- :input_port => input_port)
+ output_shards = output_component.shard.count
+ input_shards = input_component.shard.count
+
+ in_shard_connection = output_component.shard == input_component.shard
+ one_to_one = output_shards == 1 && input_shards == 1
+ one_to_many = output_shards == 1 && input_shards > 1
+ many_to_one = output_shards > 1 && input_shards == 1
+ many_to_many = output_shards > 1 && input_shards > 1
+
+ connection_type = many_to_many ? RFlow::Configuration::BrokeredZMQConnection : RFlow::Configuration::ZMQConnection
+
+ conn = connection_type.create!(:name => spec[:name],
+ :output_port_key => spec[:output_port_key],
+ :input_port_key => spec[:input_port_key],
+ :output_port => output_port,
+ :input_port => input_port)
+
+ # bind on the cardinality-1 side, connect on the cardinality-n side
+ if in_shard_connection
+ conn.options['output_responsibility'] = 'connect'
+ conn.options['input_responsibility'] = 'bind'
+ conn.options['output_address'] = "inproc://rflow.#{conn.uuid}"
+ conn.options['input_address'] = "inproc://rflow.#{conn.uuid}"
+ elsif many_to_one
+ conn.options['output_responsibility'] = 'connect'
+ conn.options['input_responsibility'] = 'bind'
+ elsif one_to_many
+ conn.options['output_responsibility'] = 'bind'
+ conn.options['input_responsibility'] = 'connect'
+ end
+
+ conn.save!
+ conn
rescue Exception => e
# TODO: Figure out why an ArgumentError doesn't put the
# offending message into e.message, even though it is printed
# out if not caught
raise RFlow::Configuration::Connection::ConnectionInvalid, "#{e.class}: #{e.message} at config '#{spec[:config_line]}'"