spec/rflow/configuration/ruby_dsl_spec.rb in rflow-1.0.0a3 vs spec/rflow/configuration/ruby_dsl_spec.rb in rflow-1.0.0a4

- old
+ new

@@ -37,29 +37,34 @@ it "should correctly process a connect declaration" do described_class.configure do |c| c.component 'first', 'First' c.component 'second', 'Second' + c.component 'third', 'Third' c.connect 'first#out' => 'second#in' c.connect 'first#out' => 'second#in[inkey]' c.connect 'first#out[outkey]' => 'second#in' c.connect 'first#out[outkey]' => 'second#in[inkey]' + c.connect 'second#out' => 'third#in', :delivery => 'broadcast' end expect(Shard).to have(1).shard - expect(Component).to have(2).components - expect(Port).to have(2).ports - expect(Connection).to have(4).connections + expect(Component).to have(3).components + expect(Port).to have(4).ports + expect(Connection).to have(5).connections - first_component = Component.where(name: 'first').first.tap do |component| - expect(component.specification).to eq('First') - expect(component).to have(0).input_ports - expect(component).to have(1).output_port - expect(component.output_ports.first.name).to eq('out') + first_component = Component.find_by_name('first').tap do |c| + expect(c.specification).to eq('First') + expect(c).to have(0).input_ports + expect(c).to have(1).output_port - expect(component.output_ports.first).to have(4).connections - component.output_ports.first.connections.tap do |connections| + out_port = c.output_ports.first + expect(out_port.name).to eq('out') + + expect(out_port).to have(4).connections + out_port.connections.tap do |connections| + connections.each {|c| expect(c.delivery).to eq 'round-robin' } expect(connections[0].input_port_key).to be_nil expect(connections[0].output_port_key).to be_nil expect(connections[1].input_port_key).to eq('inkey') expect(connections[1].output_port_key).to be_nil expect(connections[2].input_port_key).to be_nil @@ -67,19 +72,43 @@ expect(connections[3].input_port_key).to eq('inkey') expect(connections[3].output_port_key).to eq('outkey') end end - Component.where(name: 'second').first.tap do |component| - expect(component.specification).to eq('Second') - expect(component).to have(1).input_port - expect(component.input_ports.first.name).to eq('in') - expect(component).to have(0).output_ports + second_component = Component.find_by_name('second').tap do |c| + expect(c.specification).to eq('Second') + expect(c).to have(1).input_port + expect(c).to have(1).output_port - expect(component.input_ports.first).to have(4).connections - expect(component.input_ports.first.connections).to eq(first_component.output_ports.first.connections) + in_port = c.input_ports.first + expect(in_port.name).to eq('in') + + out_port = c.output_ports.first + expect(out_port.name).to eq('out') + + expect(in_port).to have(4).connections + expect(in_port.connections).to eq(first_component.output_ports.first.connections) + + expect(out_port).to have(1).connections + out_port.connections.first.tap do |connection| + expect(connection.delivery).to eq 'broadcast' + expect(connection.input_port_key).to be_nil + expect(connection.output_port_key).to be_nil + end end + + Component.find_by_name('third').tap do |c| + expect(c.specification).to eq('Third') + expect(c).to have(1).input_port + expect(c).to have(0).output_ports + + in_port = c.input_ports.first + expect(in_port.name).to eq('in') + + expect(in_port).to have(1).connections + expect(in_port.connections).to eq(second_component.output_ports.first.connections) + end end it "should correctly process shard declarations" do described_class.configure do |c| c.component 'first', 'First', :opt1 => 'opt1' @@ -308,9 +337,79 @@ conn.options.tap do |opts| expect(opts['output_socket_type']).to eq('PUSH') expect(opts['output_address']).to eq("ipc://rflow.#{conn.uuid}.in") expect(opts['output_responsibility']).to eq('connect') expect(opts['input_socket_type']).to eq('PULL') + expect(opts['input_address']).to eq("ipc://rflow.#{conn.uuid}.out") + expect(opts['input_responsibility']).to eq('connect') + end + end + end + + it "should generate PUB-SUB ipc ZeroMQ connections for one-to-many broadcast connections" do + described_class.configure do |c| + + c.shard "s1", :process => 1 do |s| + s.component 'first', 'First', :opt1 => 'opt1' + end + + c.shard "s2", :process => 3 do |s| + s.component 'second', 'Second', :opt1 => 'opt1', "opt2" => "opt2" + end + + c.connect 'first#out' => 'second#in', :delivery => 'broadcast' + end + + expect(Shard).to have(2).shards + expect(Component).to have(2).components + expect(Port).to have(2).ports + expect(Connection).to have(1).connections + + Connection.first.tap do |conn| + expect(conn.type).to eq('RFlow::Configuration::ZMQConnection') + expect(conn.name).to eq('first#out=>second#in') + expect(conn.output_port_key).to be_nil + expect(conn.input_port_key).to be_nil + conn.options.tap do |opts| + expect(opts['output_socket_type']).to eq('PUB') + expect(opts['output_address']).to eq("ipc://rflow.#{conn.uuid}") + expect(opts['output_responsibility']).to eq('bind') + expect(opts['input_socket_type']).to eq('SUB') + expect(opts['input_address']).to eq("ipc://rflow.#{conn.uuid}") + expect(opts['input_responsibility']).to eq('connect') + end + end + end + + it "should generate PUB-SUB brokered ZeroMQ connections for many-to-many broadcast connections" do + described_class.configure do |c| + + c.shard "s1", :process => 3 do |s| + s.component 'first', 'First', :opt1 => 'opt1' + end + + c.shard "s2", :process => 3 do |s| + s.component 'second', 'Second', :opt1 => 'opt1', "opt2" => "opt2" + end + + c.connect 'first#out' => 'second#in', :delivery => 'broadcast' + end + + expect(Shard).to have(2).shards + expect(Component).to have(2).components + expect(Port).to have(2).ports + expect(Connection).to have(1).connections + + Connection.first.tap do |conn| + expect(conn.type).to eq('RFlow::Configuration::BrokeredZMQConnection') + expect(conn.name).to eq('first#out=>second#in') + expect(conn.output_port_key).to be_nil + expect(conn.input_port_key).to be_nil + conn.options.tap do |opts| + expect(opts['output_socket_type']).to eq('PUB') + expect(opts['output_address']).to eq("ipc://rflow.#{conn.uuid}.in") + expect(opts['output_responsibility']).to eq('connect') + expect(opts['input_socket_type']).to eq('SUB') expect(opts['input_address']).to eq("ipc://rflow.#{conn.uuid}.out") expect(opts['input_responsibility']).to eq('connect') end end end