lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a3 vs lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a4
- old
+ new
@@ -72,16 +72,18 @@
# Array ports are specified with an key suffix in standard
# progamming syntax, i.e.
# connect 'componentA#arrayport[2]' => 'componentB#in[1]'
# Uses the model to assign random UUIDs
def connect(hash)
- hash.each do |output_string, input_string|
+ delivery = hash[:delivery] || 'round-robin'
+ hash.except(:delivery).each do |output_string, input_string|
output_component_name, output_port_name, output_port_key = parse_connection_string(output_string)
input_component_name, input_port_name, input_port_key = parse_connection_string(input_string)
connection_specs << {
:name => output_string + '=>' + input_string,
+ :delivery => delivery,
:output_component_name => output_component_name,
:output_port_name => output_port_name, :output_port_key => output_port_key,
:output_string => output_string,
:input_component_name => input_component_name,
:input_port_name => input_port_name, :input_port_key => input_port_key,
@@ -195,10 +197,11 @@
many_to_many = output_shards > 1 && input_shards > 1
connection_type = many_to_many ? RFlow::Configuration::BrokeredZMQConnection : RFlow::Configuration::ZMQConnection
conn = connection_type.create!(:name => spec[:name],
+ :delivery => spec[:delivery],
:output_port_key => spec[:output_port_key],
:input_port_key => spec[:input_port_key],
:output_port => output_port,
:input_port => input_port)
@@ -212,9 +215,21 @@
conn.options['output_responsibility'] = 'connect'
conn.options['input_responsibility'] = 'bind'
elsif one_to_many
conn.options['output_responsibility'] = 'bind'
conn.options['input_responsibility'] = 'connect'
+ end
+
+ case spec[:delivery]
+ when 'broadcast'
+ conn.options['output_socket_type'] = 'PUB'
+ conn.options['input_socket_type'] = 'SUB'
+ when 'round-robin'
+ conn.options['output_socket_type'] = 'PUSH'
+ conn.options['input_socket_type'] = 'PULL'
+ else
+ raise RFlow::Configuration::Connection::ConnectionInvalid,
+ "Delivery type '#{spec[:delivery]}' unknown at #{spec[:config_line]}"
end
conn.save!
conn
rescue Exception => e