lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a2 vs lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a3

- old
+ new

@@ -40,10 +40,20 @@ shard_specs << @current_shard yield self @current_shard = default_shard end + # shortcut + def process(name, options = {}, &block) + shard(name, options.merge(:type => :process), &block) + end + + # shortcut + def thread(name, options = {}, &block) + shard(name, options.merge(:type => :thread), &block) + end + # DSL method to specify a component. Expects a name, # specification, and set of component specific options, that # must be marshallable into the database (i.e. should all be strings) def component(name, specification, options = {}) @current_shard[:components] << { @@ -83,16 +93,16 @@ # Method called within the config file itself def self.configure config_file = self.new yield config_file - config_file.process + config_file.process_objects end # Method to process the 'DSL' objects into the config database # via ActiveRecord - def process + def process_objects process_setting_specs process_shard_specs process_connection_specs end @@ -152,12 +162,12 @@ end end # For each given connection, break up each input/output # component/port specification, ensure that the component - # already exists in the database (by name). Also, only supports - # ZeroMQ ipc sockets + # already exists in the database (by name). Chooses the best + # connection type for any pair of components. def process_connection_specs connection_specs.each do |spec| begin RFlow.logger.debug "Found connection from '#{spec[:output_string]}' to '#{spec[:input_string]}', creating" @@ -173,14 +183,42 @@ raise RFlow::Configuration::Connection::ConnectionInvalid, "Component '#{spec[:input_component_name]}' not found at #{spec[:config_line]}" unless input_component input_port = input_component.input_ports.find_or_initialize_by_name :name => spec[:input_port_name] input_port.save! - RFlow::Configuration::ZMQConnection.create!(:name => spec[:name], - :output_port_key => spec[:output_port_key], - :input_port_key => spec[:input_port_key], - :output_port => output_port, - :input_port => input_port) + output_shards = output_component.shard.count + input_shards = input_component.shard.count + + in_shard_connection = output_component.shard == input_component.shard + one_to_one = output_shards == 1 && input_shards == 1 + one_to_many = output_shards == 1 && input_shards > 1 + many_to_one = output_shards > 1 && input_shards == 1 + many_to_many = output_shards > 1 && input_shards > 1 + + connection_type = many_to_many ? RFlow::Configuration::BrokeredZMQConnection : RFlow::Configuration::ZMQConnection + + conn = connection_type.create!(:name => spec[:name], + :output_port_key => spec[:output_port_key], + :input_port_key => spec[:input_port_key], + :output_port => output_port, + :input_port => input_port) + + # bind on the cardinality-1 side, connect on the cardinality-n side + if in_shard_connection + conn.options['output_responsibility'] = 'connect' + conn.options['input_responsibility'] = 'bind' + conn.options['output_address'] = "inproc://rflow.#{conn.uuid}" + conn.options['input_address'] = "inproc://rflow.#{conn.uuid}" + elsif many_to_one + conn.options['output_responsibility'] = 'connect' + conn.options['input_responsibility'] = 'bind' + elsif one_to_many + conn.options['output_responsibility'] = 'bind' + conn.options['input_responsibility'] = 'connect' + end + + conn.save! + conn rescue Exception => e # TODO: Figure out why an ArgumentError doesn't put the # offending message into e.message, even though it is printed # out if not caught raise RFlow::Configuration::Connection::ConnectionInvalid, "#{e.class}: #{e.message} at config '#{spec[:config_line]}'"