lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a5 vs lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a6
- old
+ new
@@ -188,26 +188,28 @@
input_port.save!
output_shards = output_component.shard.count
input_shards = input_component.shard.count
+ broadcast_connection = spec[:delivery] == 'broadcast'
in_shard_connection = output_component.shard == input_component.shard
one_to_one = output_shards == 1 && input_shards == 1
one_to_many = output_shards == 1 && input_shards > 1
many_to_one = output_shards > 1 && input_shards == 1
many_to_many = output_shards > 1 && input_shards > 1
- connection_type = many_to_many ? RFlow::Configuration::BrokeredZMQConnection : RFlow::Configuration::ZMQConnection
+ use_broker = many_to_many && (broadcast_connection || !in_shard_connection)
+ connection_type = use_broker ? RFlow::Configuration::BrokeredZMQConnection : RFlow::Configuration::ZMQConnection
conn = connection_type.create!(:name => spec[:name],
:delivery => spec[:delivery],
:output_port_key => spec[:output_port_key],
:input_port_key => spec[:input_port_key],
:output_port => output_port,
:input_port => input_port)
# bind on the cardinality-1 side, connect on the cardinality-n side
- if in_shard_connection
+ if in_shard_connection && !use_broker
conn.options['output_responsibility'] = 'connect'
conn.options['input_responsibility'] = 'bind'
conn.options['output_address'] = "inproc://rflow.#{conn.uuid}"
conn.options['input_address'] = "inproc://rflow.#{conn.uuid}"
elsif many_to_one