lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a5 vs lib/rflow/configuration/ruby_dsl.rb in rflow-1.0.0a6

- old
+ new

@@ -188,26 +188,28 @@ input_port.save! output_shards = output_component.shard.count input_shards = input_component.shard.count + broadcast_connection = spec[:delivery] == 'broadcast' in_shard_connection = output_component.shard == input_component.shard one_to_one = output_shards == 1 && input_shards == 1 one_to_many = output_shards == 1 && input_shards > 1 many_to_one = output_shards > 1 && input_shards == 1 many_to_many = output_shards > 1 && input_shards > 1 - connection_type = many_to_many ? RFlow::Configuration::BrokeredZMQConnection : RFlow::Configuration::ZMQConnection + use_broker = many_to_many && (broadcast_connection || !in_shard_connection) + connection_type = use_broker ? RFlow::Configuration::BrokeredZMQConnection : RFlow::Configuration::ZMQConnection conn = connection_type.create!(:name => spec[:name], :delivery => spec[:delivery], :output_port_key => spec[:output_port_key], :input_port_key => spec[:input_port_key], :output_port => output_port, :input_port => input_port) # bind on the cardinality-1 side, connect on the cardinality-n side - if in_shard_connection + if in_shard_connection && !use_broker conn.options['output_responsibility'] = 'connect' conn.options['input_responsibility'] = 'bind' conn.options['output_address'] = "inproc://rflow.#{conn.uuid}" conn.options['input_address'] = "inproc://rflow.#{conn.uuid}" elsif many_to_one