lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a3 vs lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a4

- old
+ new

@@ -72,16 +72,18 @@ # Array ports are specified with an key suffix in standard # progamming syntax, i.e. # connect 'componentA#arrayport[2]' => 'componentB#in[1]' # Uses the model to assign random UUIDs def connect(hash) - hash.each do |output_string, input_string| + delivery = hash[:delivery] || 'round-robin' + hash.except(:delivery).each do |output_string, input_string| output_component_name, output_port_name, output_port_key = parse_connection_string(output_string) input_component_name, input_port_name, input_port_key = parse_connection_string(input_string) connection_specs << { :name => output_string + '=>' + input_string, + :delivery => delivery, :output_component_name => output_component_name, :output_port_name => output_port_name, :output_port_key => output_port_key, :output_string => output_string, :input_component_name => input_component_name, :input_port_name => input_port_name, :input_port_key => input_port_key, @@ -195,10 +197,11 @@ many_to_many = output_shards > 1 && input_shards > 1 connection_type = many_to_many ? RFlow::Configuration::BrokeredZMQConnection : RFlow::Configuration::ZMQConnection conn = connection_type.create!(:name => spec[:name], + :delivery => spec[:delivery], :output_port_key => spec[:output_port_key], :input_port_key => spec[:input_port_key], :output_port => output_port, :input_port => input_port) @@ -212,9 +215,21 @@ conn.options['output_responsibility'] = 'connect' conn.options['input_responsibility'] = 'bind' elsif one_to_many conn.options['output_responsibility'] = 'bind' conn.options['input_responsibility'] = 'connect' + end + + case spec[:delivery] + when 'broadcast' + conn.options['output_socket_type'] = 'PUB' + conn.options['input_socket_type'] = 'SUB' + when 'round-robin' + conn.options['output_socket_type'] = 'PUSH' + conn.options['input_socket_type'] = 'PULL' + else + raise RFlow::Configuration::Connection::ConnectionInvalid, + "Delivery type '#{spec[:delivery]}' unknown at #{spec[:config_line]}" end conn.save! conn rescue Exception => e