spec/rflow_spec.rb in rflow-1.0.0a3 vs spec/rflow_spec.rb in rflow-1.0.0a4
- old
+ new
@@ -1,12 +1,88 @@
require 'spec_helper'
require 'open3'
require 'rflow'
describe RFlow do
+ def write_config_file(content)
+ File.open(config_file_name, 'w+') {|file| file.write content }
+ end
+
+ def execute_rflow(args)
+ stdout, stderr, status = Open3.capture3("bundle exec rflow #{args}")
+ {:stdout => stdout, :stderr => stderr, :status => status}
+ end
+
+ def load_database
+ execute_rflow("load -d #{db_file_name} -c #{config_file_name}").tap do |result|
+ expect(result[:status].exitstatus).to eq(0)
+ expect(result[:stderr]).to eq('')
+ expect(result[:stdout]).to match /Successfully initialized database.*#{db_file_name}/
+ end
+ end
+
+ def start_rflow
+ execute_rflow("start -d #{db_file_name} -e #{@extensions_file_name}").tap do |result|
+ expect(result[:status].exitstatus).to eq(0)
+ expect(result[:stderr]).to eq('')
+ expect(result[:stdout]).not_to match /error/i
+ end
+ end
+
+ def get_log_pids(logfile)
+ log_contents = File.read(logfile).chomp
+ log_lines = log_contents.split("\n")
+
+ log_lines.each {|line| expect(line).not_to match /^ERROR/ }
+ log_lines.each {|line| expect(line).not_to match /^DEBUG/ }
+
+ # Grab all the pids from the log, which seems to be the only
+ # reliable way to get them
+ log_lines.map {|line| /\((\d+)\)/.match(line)[1].to_i }.uniq
+ end
+
+ def run_and_shutdown(app_name, expected_worker_count)
+ r = start_rflow
+ sleep 2 # give the daemon a chance to finish
+
+ log_pids = get_log_pids("log/#{app_name}.log")
+
+ initial_pid = r[:status].pid
+ master_pid = File.read("run/#{app_name}.pid").chomp.to_i
+ worker_pids = log_pids - [initial_pid, master_pid]
+
+ expect(log_pids).to include initial_pid
+ expect(log_pids).to include master_pid
+
+ expect(worker_pids).to have(expected_worker_count).pids
+ expect(worker_pids).not_to include 0
+
+ expect { Process.kill(0, initial_pid) }.to raise_error(Errno::ESRCH)
+ ([master_pid] + worker_pids).each do |pid|
+ expect(Process.kill(0, pid)).to eq(1)
+ end
+
+ yield # verify output
+
+ # Terminate the master
+ expect(Process.kill("TERM", master_pid)).to eq(1)
+
+ # Make sure everything is dead after a second
+ sleep 2
+ ([master_pid] + worker_pids).each do |pid|
+ expect { Process.kill(0, pid) }.to raise_error(Errno::ESRCH)
+ end
+ rescue Exception => e
+ Process.kill("TERM", master_pid) if master_pid
+ raise
+ end
+
+ let(:config_file_name) { 'input_config' }
+ let(:db_file_name) { 'outdb' }
+
before(:all) do
- @extensions_file_name = File.join(File.dirname(__FILE__), 'fixtures', 'extensions_ints.rb')
+ @extensions_file_name = File.join(File.dirname(__FILE__), 'fixtures', 'extensions.rb')
end
before(:each) do
@original_directory_path = Dir.getwd
@run_directory_path = File.join(@temp_directory_path, 'run')
@@ -41,11 +117,11 @@
rflow_thread.join
end
it "should run a non-sharded workflow" do
run_rflow_with_dsl do |c|
- c.setting 'rflow.log_level', 'FATAL'
+ c.setting 'rflow.log_level', 'ERROR'
c.setting 'rflow.application_directory_path', @temp_directory_path
c.setting 'rflow.application_name', 'nonsharded_test'
c.component 'generate_ints', 'RFlow::Components::GenerateIntegerSequence', 'start' => 20, 'finish' => 30
c.component 'output', 'RFlow::Components::FileOutput', 'output_file_path' => 'out'
@@ -82,11 +158,11 @@
end
end
it "should run a sharded workflow" do
run_rflow_with_dsl do |c|
- c.setting 'rflow.log_level', 'FATAL'
+ c.setting 'rflow.log_level', 'ERROR'
c.setting 'rflow.application_directory_path', @temp_directory_path
c.setting 'rflow.application_name', 'sharded_test'
c.shard 's1', :process => 3 do |s|
s.component 'generate_ints1', 'RFlow::Components::GenerateIntegerSequence', 'start' => 0, 'finish' => 10, 'step' => 3
@@ -128,42 +204,66 @@
output_files.each do |file_name, expected_contents|
expect(File.exist?(File.join(@temp_directory_path, file_name))).to be true
expect(File.readlines(file_name).map(&:to_i).sort).to eq(expected_contents.sort)
end
end
+
+ it "should deliver broadcast messages to every copy of a shard" do
+ run_rflow_with_dsl do |c|
+ c.setting 'rflow.log_level', 'FATAL'
+ c.setting 'rflow.application_directory_path', @temp_directory_path
+ c.setting 'rflow.application_name', 'sharded_broadcast_test'
+
+ c.shard 's1', :process => 1 do |s|
+ s.component 'generate_ints1', 'RFlow::Components::GenerateIntegerSequence', 'start' => 0, 'finish' => 10, 'step' => 3
+ end
+
+ c.shard 's2', :process => 2 do |s|
+ s.component 'generate_ints2', 'RFlow::Components::GenerateIntegerSequence', 'start' => 1, 'finish' => 11, 'step' => 3
+ end
+
+ c.shard 's3', :type => :process, :count => 3 do |s|
+ s.component 'broadcast_output', 'RFlow::Components::FileOutput', 'output_file_path' => 'broadcast'
+ s.component 'roundrobin_output', 'RFlow::Components::FileOutput', 'output_file_path' => 'round-robin'
+ end
+
+ c.connect 'generate_ints1#out' => 'broadcast_output#in', :delivery => 'broadcast'
+ c.connect 'generate_ints2#out' => 'broadcast_output#in', :delivery => 'broadcast'
+ c.connect 'generate_ints1#out' => 'roundrobin_output#in'
+ c.connect 'generate_ints2#out' => 'roundrobin_output#in'
+ end
+
+ output_files = {
+ 'broadcast' => ([0, 3, 6, 9] * 3) + ([1, 4, 7, 10] * 6),
+ 'round-robin' => [0, 3, 6, 9] + ([1, 4, 7, 10] * 2)
+ }
+
+ expect(RFlow.master).to have(3).shards
+ expect(RFlow.master.shards.map(&:count)).to eq([1, 2, 3])
+ expect(RFlow.master.shards.map(&:workers).map(&:count)).to eq([1, 2, 3])
+
+ output_files.each do |file_name, expected_contents|
+ expect(File.exist?(File.join(@temp_directory_path, file_name))).to be true
+ expect(File.readlines(file_name).map(&:to_i).sort).to eq(expected_contents.sort)
+ end
+ end
end
end
context "when executing via the rflow binary" do
- def execute_rflow(args)
- stdout, stderr, status = Open3.capture3("bundle exec rflow #{args}")
- {:stdout => stdout, :stderr => stderr, :status => status}
- end
-
context "with a simple ruby DSL config file" do
- let(:config_file_name) { 'input_config' }
- let(:db_file_name) { 'outdb' }
-
before(:each) do
- File.open(config_file_name, 'w+') do |file|
- file.write <<-EOF
- RFlow::Configuration::RubyDSL.configure do |c|
- c.setting 'mysetting', 'myvalue'
- end
- EOF
- end
+ write_config_file <<-EOF
+ RFlow::Configuration::RubyDSL.configure do |c|
+ c.setting 'mysetting', 'myvalue'
+ end
+ EOF
end
it "should load a ruby dsl file into a sqlite DB" do
- r = execute_rflow("load -d #{db_file_name} -c #{config_file_name}")
+ load_database
- # Make sure that the process execution worked
- expect(r[:status].exitstatus).to eq(0)
- expect(r[:stderr]).to eq('')
- expect(r[:stdout]).to match /Successfully initialized database.*#{db_file_name}/
-
- # Make sure the config actually got loaded
ActiveRecord::Base.establish_connection adapter: "sqlite3", database: db_file_name
expect(RFlow::Configuration::Setting.where(:name => 'mysetting').first.value).to eq('myvalue')
end
it "should not load a database if the database file already exists" do
@@ -176,50 +276,74 @@
expect(r[:stderr]).to eq('')
expect(r[:stdout]).to match /Config database.*#{db_file_name}.*exists/
end
end
+ context "with a component that runs subshells" do
+ let(:app_name) { 'sharded_subshell_test' }
+
+ before(:each) do
+ write_config_file <<-EOF
+ RFlow::Configuration::RubyDSL.configure do |c|
+ c.setting('rflow.log_level', 'INFO')
+ c.setting('rflow.application_directory_path', '#{@temp_directory_path}')
+ c.setting('rflow.application_name', '#{app_name}')
+
+ c.component 'generate_ints', 'RFlow::Components::GenerateIntegerSequence', 'start' => 0, 'finish' => 10, 'step' => 3
+ c.component 'subshell_date', 'RFlow::Components::DateShellComponent'
+ c.component 'output', 'RFlow::Components::FileOutput', 'output_file_path' => 'out1'
+
+ c.connect 'generate_ints#out' => 'subshell_date#in'
+ c.connect 'subshell_date#out' => 'output#in'
+ end
+ EOF
+
+ load_database
+ end
+
+ it "should run successfully daemonize and run in the background" do
+ run_and_shutdown app_name, 1 do # 1 default worker
+ expect(File.exist?(File.join(@temp_directory_path, 'out1'))).to be true
+ File.readlines('out1').each {|line| expect(line).to match /\w+ \w+ \d+ \d+:\d+:\d+ \w+ \d+/ }
+ end
+ end
+ end
+
context "with a complex, sharded ruby DSL config file" do
- let(:config_file_name) { 'input_config' }
- let(:db_file_name) { 'config_db' }
let(:app_name) { 'sharded_bin_test' }
before(:each) do
- File.open(config_file_name, 'w+') do |file|
- file.write <<-EOF
- RFlow::Configuration::RubyDSL.configure do |c|
- c.setting('rflow.log_level', 'INFO')
- c.setting('rflow.application_directory_path', '#{@temp_directory_path}')
- c.setting('rflow.application_name', '#{app_name}')
+ write_config_file <<-EOF
+ RFlow::Configuration::RubyDSL.configure do |c|
+ c.setting('rflow.log_level', 'INFO')
+ c.setting('rflow.application_directory_path', '#{@temp_directory_path}')
+ c.setting('rflow.application_name', '#{app_name}')
- c.shard 's1', :process => 3 do |s|
- s.component 'generate_ints1', 'RFlow::Components::GenerateIntegerSequence', 'start' => 0, 'finish' => 10, 'step' => 3
- end
- c.shard 's2', :type => :process, :count => 2 do |s|
- s.component 'generate_ints2', 'RFlow::Components::GenerateIntegerSequence', 'start' => 20, 'finish' => 30
- end
- c.component 'generate_ints3', 'RFlow::Components::GenerateIntegerSequence', 'start' => 100, 'finish' => 105
- c.shard 's3', :process => 2 do |s|
- s.component 'output1', 'RFlow::Components::FileOutput', 'output_file_path' => 'out1'
- s.component 'output2', 'RFlow::Components::FileOutput', 'output_file_path' => 'out2'
- end
- c.component 'output3', 'RFlow::Components::FileOutput', 'output_file_path' => 'out3'
- c.component 'output_all', 'RFlow::Components::FileOutput', 'output_file_path' => 'out_all'
-
- c.connect 'generate_ints1#out' => 'output1#in'
- c.connect 'generate_ints2#out' => 'output2#in'
- c.connect 'generate_ints3#out' => 'output3#in'
- c.connect 'generate_ints1#out' => 'output_all#in'
- c.connect 'generate_ints2#out' => 'output_all#in'
- c.connect 'generate_ints3#out' => 'output_all#in'
+ c.shard 's1', :process => 3 do |s|
+ s.component 'generate_ints1', 'RFlow::Components::GenerateIntegerSequence', 'start' => 0, 'finish' => 10, 'step' => 3
end
- EOF
- end
- r = execute_rflow("load -d #{db_file_name} -c #{config_file_name}")
- expect(r[:status].exitstatus).to eq(0)
- expect(r[:stderr]).to eq('')
- expect(r[:stdout]).to match /Successfully initialized database.*#{db_file_name}/
+ c.shard 's2', :type => :process, :count => 2 do |s|
+ s.component 'generate_ints2', 'RFlow::Components::GenerateIntegerSequence', 'start' => 20, 'finish' => 30
+ end
+ c.component 'generate_ints3', 'RFlow::Components::GenerateIntegerSequence', 'start' => 100, 'finish' => 105
+ c.shard 's3', :process => 2 do |s|
+ s.component 'output1', 'RFlow::Components::FileOutput', 'output_file_path' => 'out1'
+ s.component 'output2', 'RFlow::Components::FileOutput', 'output_file_path' => 'out2'
+ end
+ c.component 'output3', 'RFlow::Components::FileOutput', 'output_file_path' => 'out3'
+ c.component 'output_all', 'RFlow::Components::FileOutput', 'output_file_path' => 'out_all'
+
+ c.connect 'generate_ints1#out' => 'output1#in'
+ c.connect 'generate_ints2#out' => 'output2#in'
+ c.connect 'generate_ints3#out' => 'output3#in'
+ c.connect 'generate_ints1#out' => 'output_all#in'
+ c.connect 'generate_ints2#out' => 'output_all#in'
+ c.connect 'generate_ints3#out' => 'output_all#in'
+ end
+ EOF
+
+ load_database
end
it "should not start if the components aren't loaded" do
r = execute_rflow("start -d #{db_file_name} -f")
@@ -227,66 +351,21 @@
expect(r[:stderr]).to eq('')
expect(r[:stdout]).to match /error/i
end
it "should daemonize and run in the background" do
- begin
- r = execute_rflow("start -d #{db_file_name} -e #{@extensions_file_name}")
+ output_files = {
+ 'out1' => [0, 3, 6, 9] * 3,
+ 'out2' => (20..30).to_a * 2,
+ 'out3' => (100..105).to_a,
+ 'out_all' => [0, 3, 6, 9] * 3 + (20..30).to_a * 2 + (100..105).to_a
+ }
- expect(r[:status].exitstatus).to eq(0)
- expect(r[:stderr]).to eq('')
- expect(r[:stdout]).not_to match /error/i
-
- sleep 2 # give the daemon a chance to finish
-
- log_contents = File.read("log/#{app_name}.log").chomp
- log_lines = log_contents.split("\n")
-
- log_lines.each {|line| expect(line).not_to match /^ERROR/ }
- log_lines.each {|line| expect(line).not_to match /^DEBUG/ }
-
- # Grab all the pids from the log, which seems to be the only
- # reliable way to get them
- log_pids = log_lines.map {|line| /\((\d+)\)/.match(line)[1].to_i }.uniq
-
- initial_pid = r[:status].pid
- master_pid = File.read("run/#{app_name}.pid").chomp.to_i
- worker_pids = log_pids - [initial_pid, master_pid]
-
- expect(log_pids).to include initial_pid
- expect(log_pids).to include master_pid
-
- expect(worker_pids).to have(10).pids # 1+3+2+2 workers, 2 brokers
- expect(worker_pids).not_to include 0
-
- expect { Process.kill(0, initial_pid) }.to raise_error(Errno::ESRCH)
- ([master_pid] + worker_pids).each do |pid|
- expect(Process.kill(0, pid)).to eq(1)
- end
-
- output_files = {
- 'out1' => [0, 3, 6, 9] * 3,
- 'out2' => (20..30).to_a * 2,
- 'out3' => (100..105).to_a,
- 'out_all' => [0, 3, 6, 9] * 3 + (20..30).to_a * 2 + (100..105).to_a
- }
-
+ run_and_shutdown app_name, 10 do # 1+3+2+2 workers, 2 brokers
output_files.each do |file_name, expected_contents|
expect(File.exist?(File.join(@temp_directory_path, file_name))).to be true
expect(File.readlines(file_name).map(&:to_i).sort).to eq(expected_contents.sort)
end
-
- # Terminate the master
- expect(Process.kill("TERM", master_pid)).to eq(1)
-
- # Make sure everything is dead after a second
- sleep 2
- ([master_pid] + worker_pids).each do |pid|
- expect { Process.kill(0, pid) }.to raise_error(Errno::ESRCH)
- end
- rescue Exception => e
- Process.kill("TERM", master_pid) if master_pid
- raise
end
end
end
end
end