spec/rflow/configuration/ruby_dsl_spec.rb in rflow-1.0.0a3 vs spec/rflow/configuration/ruby_dsl_spec.rb in rflow-1.0.0a4
- old
+ new
@@ -37,29 +37,34 @@
it "should correctly process a connect declaration" do
described_class.configure do |c|
c.component 'first', 'First'
c.component 'second', 'Second'
+ c.component 'third', 'Third'
c.connect 'first#out' => 'second#in'
c.connect 'first#out' => 'second#in[inkey]'
c.connect 'first#out[outkey]' => 'second#in'
c.connect 'first#out[outkey]' => 'second#in[inkey]'
+ c.connect 'second#out' => 'third#in', :delivery => 'broadcast'
end
expect(Shard).to have(1).shard
- expect(Component).to have(2).components
- expect(Port).to have(2).ports
- expect(Connection).to have(4).connections
+ expect(Component).to have(3).components
+ expect(Port).to have(4).ports
+ expect(Connection).to have(5).connections
- first_component = Component.where(name: 'first').first.tap do |component|
- expect(component.specification).to eq('First')
- expect(component).to have(0).input_ports
- expect(component).to have(1).output_port
- expect(component.output_ports.first.name).to eq('out')
+ first_component = Component.find_by_name('first').tap do |c|
+ expect(c.specification).to eq('First')
+ expect(c).to have(0).input_ports
+ expect(c).to have(1).output_port
- expect(component.output_ports.first).to have(4).connections
- component.output_ports.first.connections.tap do |connections|
+ out_port = c.output_ports.first
+ expect(out_port.name).to eq('out')
+
+ expect(out_port).to have(4).connections
+ out_port.connections.tap do |connections|
+ connections.each {|c| expect(c.delivery).to eq 'round-robin' }
expect(connections[0].input_port_key).to be_nil
expect(connections[0].output_port_key).to be_nil
expect(connections[1].input_port_key).to eq('inkey')
expect(connections[1].output_port_key).to be_nil
expect(connections[2].input_port_key).to be_nil
@@ -67,19 +72,43 @@
expect(connections[3].input_port_key).to eq('inkey')
expect(connections[3].output_port_key).to eq('outkey')
end
end
- Component.where(name: 'second').first.tap do |component|
- expect(component.specification).to eq('Second')
- expect(component).to have(1).input_port
- expect(component.input_ports.first.name).to eq('in')
- expect(component).to have(0).output_ports
+ second_component = Component.find_by_name('second').tap do |c|
+ expect(c.specification).to eq('Second')
+ expect(c).to have(1).input_port
+ expect(c).to have(1).output_port
- expect(component.input_ports.first).to have(4).connections
- expect(component.input_ports.first.connections).to eq(first_component.output_ports.first.connections)
+ in_port = c.input_ports.first
+ expect(in_port.name).to eq('in')
+
+ out_port = c.output_ports.first
+ expect(out_port.name).to eq('out')
+
+ expect(in_port).to have(4).connections
+ expect(in_port.connections).to eq(first_component.output_ports.first.connections)
+
+ expect(out_port).to have(1).connections
+ out_port.connections.first.tap do |connection|
+ expect(connection.delivery).to eq 'broadcast'
+ expect(connection.input_port_key).to be_nil
+ expect(connection.output_port_key).to be_nil
+ end
end
+
+ Component.find_by_name('third').tap do |c|
+ expect(c.specification).to eq('Third')
+ expect(c).to have(1).input_port
+ expect(c).to have(0).output_ports
+
+ in_port = c.input_ports.first
+ expect(in_port.name).to eq('in')
+
+ expect(in_port).to have(1).connections
+ expect(in_port.connections).to eq(second_component.output_ports.first.connections)
+ end
end
it "should correctly process shard declarations" do
described_class.configure do |c|
c.component 'first', 'First', :opt1 => 'opt1'
@@ -308,9 +337,79 @@
conn.options.tap do |opts|
expect(opts['output_socket_type']).to eq('PUSH')
expect(opts['output_address']).to eq("ipc://rflow.#{conn.uuid}.in")
expect(opts['output_responsibility']).to eq('connect')
expect(opts['input_socket_type']).to eq('PULL')
+ expect(opts['input_address']).to eq("ipc://rflow.#{conn.uuid}.out")
+ expect(opts['input_responsibility']).to eq('connect')
+ end
+ end
+ end
+
+ it "should generate PUB-SUB ipc ZeroMQ connections for one-to-many broadcast connections" do
+ described_class.configure do |c|
+
+ c.shard "s1", :process => 1 do |s|
+ s.component 'first', 'First', :opt1 => 'opt1'
+ end
+
+ c.shard "s2", :process => 3 do |s|
+ s.component 'second', 'Second', :opt1 => 'opt1', "opt2" => "opt2"
+ end
+
+ c.connect 'first#out' => 'second#in', :delivery => 'broadcast'
+ end
+
+ expect(Shard).to have(2).shards
+ expect(Component).to have(2).components
+ expect(Port).to have(2).ports
+ expect(Connection).to have(1).connections
+
+ Connection.first.tap do |conn|
+ expect(conn.type).to eq('RFlow::Configuration::ZMQConnection')
+ expect(conn.name).to eq('first#out=>second#in')
+ expect(conn.output_port_key).to be_nil
+ expect(conn.input_port_key).to be_nil
+ conn.options.tap do |opts|
+ expect(opts['output_socket_type']).to eq('PUB')
+ expect(opts['output_address']).to eq("ipc://rflow.#{conn.uuid}")
+ expect(opts['output_responsibility']).to eq('bind')
+ expect(opts['input_socket_type']).to eq('SUB')
+ expect(opts['input_address']).to eq("ipc://rflow.#{conn.uuid}")
+ expect(opts['input_responsibility']).to eq('connect')
+ end
+ end
+ end
+
+ it "should generate PUB-SUB brokered ZeroMQ connections for many-to-many broadcast connections" do
+ described_class.configure do |c|
+
+ c.shard "s1", :process => 3 do |s|
+ s.component 'first', 'First', :opt1 => 'opt1'
+ end
+
+ c.shard "s2", :process => 3 do |s|
+ s.component 'second', 'Second', :opt1 => 'opt1', "opt2" => "opt2"
+ end
+
+ c.connect 'first#out' => 'second#in', :delivery => 'broadcast'
+ end
+
+ expect(Shard).to have(2).shards
+ expect(Component).to have(2).components
+ expect(Port).to have(2).ports
+ expect(Connection).to have(1).connections
+
+ Connection.first.tap do |conn|
+ expect(conn.type).to eq('RFlow::Configuration::BrokeredZMQConnection')
+ expect(conn.name).to eq('first#out=>second#in')
+ expect(conn.output_port_key).to be_nil
+ expect(conn.input_port_key).to be_nil
+ conn.options.tap do |opts|
+ expect(opts['output_socket_type']).to eq('PUB')
+ expect(opts['output_address']).to eq("ipc://rflow.#{conn.uuid}.in")
+ expect(opts['output_responsibility']).to eq('connect')
+ expect(opts['input_socket_type']).to eq('SUB')
expect(opts['input_address']).to eq("ipc://rflow.#{conn.uuid}.out")
expect(opts['input_responsibility']).to eq('connect')
end
end
end