spec/rflow_spec.rb in rflow-1.0.0a1 vs spec/rflow_spec.rb in rflow-1.0.0a2
- old
+ new
@@ -1,64 +1,53 @@
-require 'spec_helper.rb'
-
+require 'spec_helper'
require 'open3'
require 'rflow'
describe RFlow do
-
before(:all) do
@extensions_file_name = File.join(File.dirname(__FILE__), 'fixtures', 'extensions_ints.rb')
end
- context "when executing from the test script" do
+ before(:each) do
+ @original_directory_path = Dir.getwd
+ @run_directory_path = File.join(@temp_directory_path, 'run')
+ @log_directory_path = File.join(@temp_directory_path, 'log')
+ Dir.mkdir @run_directory_path
+ Dir.mkdir @log_directory_path
+ Dir.chdir @temp_directory_path
+ end
- before(:all) do
- load @extensions_file_name
- end
+ after(:each) { Dir.chdir @original_directory_path }
- describe '.run' do
- before(:each) do
- @original_directory_path = Dir.getwd
- @run_directory_path = File.join(@temp_directory_path, 'run')
- @log_directory_path = File.join(@temp_directory_path, 'log')
- Dir.mkdir @run_directory_path
- Dir.mkdir @log_directory_path
- end
+ context "when executing from the test script" do
+ before(:all) { load @extensions_file_name }
- after(:each) do
- Dir.chdir @original_directory_path
- end
-
+ describe '.run!' do
def run_rflow_with_dsl(&block)
rflow_thread = Thread.new do
ActiveRecord::Base.establish_connection adapter: "sqlite3", database: ":memory:"
RFlow::Configuration.migrate_database
- RFlow::Configuration::RubyDSL.configure do |c|
- block.call(c)
- end
-
+ RFlow::Configuration::RubyDSL.configure {|c| block.call(c) }
RFlow::Configuration.merge_defaults!
-
- RFlow.run nil, false
+ RFlow.run! nil, false
end
# TODO: figure out a way to get rid of this sleep, as there
# should be a better way to figure out when RFlow is done
- sleep(2)
+ sleep(5)
- # Shut down the reactor and the thread
+ # Shut down the workers, the reactor, and the thread
+ RFlow.master.shutdown! 'SIGQUIT'
EM.run { EM.stop }
rflow_thread.join
end
-
it "should run a non-sharded workflow" do
-
run_rflow_with_dsl do |c|
- c.setting('rflow.log_level', 'DEBUG')
- c.setting('rflow.application_directory_path', @temp_directory_path)
- c.setting('rflow.application_name', 'nonsharded_test')
+ c.setting 'rflow.log_level', 'FATAL'
+ c.setting 'rflow.application_directory_path', @temp_directory_path
+ c.setting 'rflow.application_name', 'nonsharded_test'
c.component 'generate_ints', 'RFlow::Components::GenerateIntegerSequence', 'start' => 20, 'finish' => 30
c.component 'output', 'RFlow::Components::FileOutput', 'output_file_path' => 'out'
c.component 'output2', 'RFlow::Components::FileOutput', 'output_file_path' => 'out2'
c.component 'output_even', 'RFlow::Components::FileOutput', 'output_file_path' => 'out_even'
@@ -73,12 +62,12 @@
c.connect 'generate_ints#even_odd_out[odd]' => 'output_odd#in'
c.connect 'generate_ints#even_odd_out' => 'output_even_odd#in'
c.connect 'generate_ints2#even_odd_out' => 'output_even_odd2#in'
end
- RFlow.master.shards.count.should == 1
- RFlow.master.shards.first.workers.count.should == 1
+ RFlow.master.should have(1).shard
+ RFlow.master.shards.first.should have(1).worker
output_files = {
'out' => [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
'out2' => [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
'out_even' => [20, 22, 24, 26, 28, 30],
@@ -86,23 +75,21 @@
'out_even_odd' => [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
'out_even_odd2' => [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
}
output_files.each do |file_name, expected_contents|
- File.exist?(File.join(@temp_directory_path, file_name)).should be_true
+ File.exist?(File.join(@temp_directory_path, file_name)).should be true
File.readlines(file_name).map(&:to_i).should == expected_contents
end
end
-
it "should run a sharded workflow" do
run_rflow_with_dsl do |c|
- c.setting('rflow.log_level', 'DEBUG')
- c.setting('rflow.application_directory_path', @temp_directory_path)
- c.setting('rflow.application_name', 'sharded_test')
+ c.setting 'rflow.log_level', 'FATAL'
+ c.setting 'rflow.application_directory_path', @temp_directory_path
+ c.setting 'rflow.application_name', 'sharded_test'
- # Instantiate components
c.shard 's1', :process => 3 do |s|
s.component 'generate_ints1', 'RFlow::Components::GenerateIntegerSequence', 'start' => 0, 'finish' => 10, 'step' => 3
end
c.shard 's2', :type => :process, :count => 2 do |s|
@@ -117,20 +104,19 @@
end
c.component 'output3', 'RFlow::Components::FileOutput', 'output_file_path' => 'out3'
c.component 'output_all', 'RFlow::Components::FileOutput', 'output_file_path' => 'out_all'
- # Hook components together
c.connect 'generate_ints1#out' => 'output1#in'
c.connect 'generate_ints2#out' => 'output2#in'
c.connect 'generate_ints3#out' => 'output3#in'
c.connect 'generate_ints1#out' => 'output_all#in'
c.connect 'generate_ints2#out' => 'output_all#in'
c.connect 'generate_ints3#out' => 'output_all#in'
end
- RFlow.master.shards.count.should == 4
+ RFlow.master.should have(4).shards
RFlow.master.shards.map(&:count).should == [1, 3, 2, 2]
RFlow.master.shards.map(&:workers).map(&:count).should == [1, 3, 2, 2]
output_files = {
'out1' => [0, 3, 6, 9] * 3,
@@ -138,54 +124,40 @@
'out3' => (100..105).to_a,
'out_all' => [0, 3, 6, 9] * 3 + (20..30).to_a * 2 + (100..105).to_a
}
output_files.each do |file_name, expected_contents|
- File.exist?(File.join(@temp_directory_path, file_name)).should be_true
+ File.exist?(File.join(@temp_directory_path, file_name)).should be true
File.readlines(file_name).map(&:to_i).sort.should == expected_contents.sort
end
end
end
end
context "when executing via the rflow binary" do
- before(:each) do
- @original_directory_path = Dir.getwd
- @run_directory_path = File.join(@temp_directory_path, 'run')
- @log_directory_path = File.join(@temp_directory_path, 'log')
- Dir.mkdir @run_directory_path
- Dir.mkdir @log_directory_path
- Dir.chdir @temp_directory_path
+ def execute_rflow(args)
+ stdout, stderr, status = Open3.capture3("bundle exec rflow #{args}")
+ {:stdout => stdout, :stderr => stderr, :status => status}
end
- after(:each) do
- Dir.chdir @original_directory_path
- end
-
- def execute_rflow(rflow_args)
- r = {}
- r[:stdout], r[:stderr], r[:status] = Open3.capture3("bundle exec rflow #{rflow_args}")
- r
- end
-
context "with a simple ruby DSL config file" do
+ let(:config_file_name) { 'input_config' }
+ let(:db_file_name) { 'outdb' }
+
before(:each) do
- @config_file_name = 'input_config'
- File.open('input_config', 'w+') do |file|
+ File.open(config_file_name, 'w+') do |file|
file.write <<-EOF
RFlow::Configuration::RubyDSL.configure do |c|
c.setting 'mysetting', 'myvalue'
end
EOF
end
end
it "should load a ruby dsl file into a sqlite DB" do
- db_file_name = 'outdb'
+ r = execute_rflow("load -d #{db_file_name} -c #{config_file_name}")
- r = execute_rflow("load -d #{db_file_name} -c #{@config_file_name}")
-
# Make sure that the process execution worked
r[:status].exitstatus.should == 0
r[:stderr].should == ''
r[:stdout].should match /Successfully initialized database.*#{db_file_name}/
@@ -193,35 +165,34 @@
ActiveRecord::Base.establish_connection adapter: "sqlite3", database: db_file_name
RFlow::Configuration::Setting.where(:name => 'mysetting').first.value.should == 'myvalue'
end
it "should not load a database if the database file already exists" do
- db_file_name = 'outdb'
- File.open(db_file_name, 'w') { |file| file.write 'boom' }
+ File.open(db_file_name, 'w') {|file| file.write 'boom' }
- r = execute_rflow("load -d #{db_file_name} -c #{@config_file_name}")
+ r = execute_rflow("load -d #{db_file_name} -c #{config_file_name}")
# Make sure that the process execution worked
r[:status].exitstatus.should == 1
r[:stderr].should == ''
r[:stdout].should match /Config database.*#{db_file_name}.*exists/
end
-
end
context "with a complex, sharded ruby DSL config file" do
+ let(:config_file_name) { 'input_config' }
+ let(:db_file_name) { 'config_db' }
+ let(:app_name) { 'sharded_bin_test' }
+
before(:each) do
- @config_file_name = 'input_config'
- @db_file_name = 'config_db'
- @app_name = 'sharded_bin_test'
- File.open(@config_file_name, 'w+') do |file|
+ File.open(config_file_name, 'w+') do |file|
file.write <<-EOF
RFlow::Configuration::RubyDSL.configure do |c|
c.setting('rflow.log_level', 'INFO')
c.setting('rflow.application_directory_path', '#{@temp_directory_path}')
- c.setting('rflow.application_name', '#{@app_name}')
- # Instantiate components
+ c.setting('rflow.application_name', '#{app_name}')
+
c.shard 's1', :process => 3 do |s|
s.component 'generate_ints1', 'RFlow::Components::GenerateIntegerSequence', 'start' => 0, 'finish' => 10, 'step' => 3
end
c.shard 's2', :type => :process, :count => 2 do |s|
s.component 'generate_ints2', 'RFlow::Components::GenerateIntegerSequence', 'start' => 20, 'finish' => 30
@@ -231,92 +202,91 @@
s.component 'output1', 'RFlow::Components::FileOutput', 'output_file_path' => 'out1'
s.component 'output2', 'RFlow::Components::FileOutput', 'output_file_path' => 'out2'
end
c.component 'output3', 'RFlow::Components::FileOutput', 'output_file_path' => 'out3'
c.component 'output_all', 'RFlow::Components::FileOutput', 'output_file_path' => 'out_all'
- # Hook components together
+
c.connect 'generate_ints1#out' => 'output1#in'
c.connect 'generate_ints2#out' => 'output2#in'
c.connect 'generate_ints3#out' => 'output3#in'
c.connect 'generate_ints1#out' => 'output_all#in'
c.connect 'generate_ints2#out' => 'output_all#in'
c.connect 'generate_ints3#out' => 'output_all#in'
end
EOF
end
- r = execute_rflow("load -d #{@db_file_name} -c #{@config_file_name}")
+ r = execute_rflow("load -d #{db_file_name} -c #{config_file_name}")
r[:status].exitstatus.should == 0
r[:stderr].should == ''
- r[:stdout].should match /Successfully initialized database.*#{@db_file_name}/
+ r[:stdout].should match /Successfully initialized database.*#{db_file_name}/
end
it "should not start if the components aren't loaded" do
- r = execute_rflow("start -d #{@db_file_name} -f")
+ r = execute_rflow("start -d #{db_file_name} -f")
r[:status].exitstatus.should == 1
r[:stderr].should == ''
r[:stdout].should match /error/i
end
it "should daemonize and run in the background" do
- r = execute_rflow("start -d #{@db_file_name} -e #{@extensions_file_name}")
+ begin
+ r = execute_rflow("start -d #{db_file_name} -e #{@extensions_file_name}")
- r[:status].exitstatus.should == 0
- r[:stderr].should == ''
- r[:stdout].should_not match /error/i
+ r[:status].exitstatus.should == 0
+ r[:stderr].should == ''
+ r[:stdout].should_not match /error/i
- sleep 1 # give the daemon a chance to finish
+ sleep 2 # give the daemon a chance to finish
- log_contents = File.read("log/#{@app_name}.log").chomp
- log_lines = log_contents.split("\n")
+ log_contents = File.read("log/#{app_name}.log").chomp
+ log_lines = log_contents.split("\n")
- puts '++++++++++++++++++++'
- puts log_contents
- puts '++++++++++++++++++++'
+ log_lines.each {|line| line.should_not match /^ERROR/ }
+ log_lines.each {|line| line.should_not match /^DEBUG/ }
- # Log file testing
- log_lines.each { |line| line.should_not match /^ERROR/ }
- log_lines.each { |line| line.should_not match /^DEBUG/ }
+ # Grab all the pids from the log, which seems to be the only
+ # reliable way to get them
+ log_pids = log_lines.map {|line| /\((\d+)\)/.match(line)[1].to_i }.uniq
- # Grab all the pids from the log, which seems to be the only
- # reliable way to get them
- log_pids = log_lines.map { |line| /\((\d+)\)/.match(line)[1].to_i }.uniq
+ initial_pid = r[:status].pid
+ master_pid = File.read("run/#{app_name}.pid").chomp.to_i
+ worker_pids = log_pids - [initial_pid, master_pid]
- initial_pid = r[:status].pid
- master_pid = File.read("run/#{@app_name}.pid").chomp.to_i
- worker_pids = log_pids - [initial_pid, master_pid]
+ log_pids.should include initial_pid
+ log_pids.should include master_pid
- log_pids.should include initial_pid
- log_pids.should include master_pid
+ worker_pids.should have(8).pids
+ worker_pids.should_not include 0
- worker_pids.size.should == 8
- worker_pids.should_not include 0
+ expect { Process.kill(0, initial_pid) }.to raise_error(Errno::ESRCH)
+ ([master_pid] + worker_pids).each do |pid|
+ Process.kill(0, pid).should == 1
+ end
- # Process checks
- expect { Process.kill(0, initial_pid) }.to raise_error(Errno::ESRCH)
- ([master_pid] + worker_pids).each do |pid|
- Process.kill(0, pid).should == 1
- end
+ output_files = {
+ 'out1' => [0, 3, 6, 9] * 3,
+ 'out2' => (20..30).to_a * 2,
+ 'out3' => (100..105).to_a,
+ 'out_all' => [0, 3, 6, 9] * 3 + (20..30).to_a * 2 + (100..105).to_a
+ }
- # Output checks
- output_files = {
- 'out1' => [0, 3, 6, 9] * 3,
- 'out2' => (20..30).to_a * 2,
- 'out3' => (100..105).to_a,
- 'out_all' => [0, 3, 6, 9] * 3 + (20..30).to_a * 2 + (100..105).to_a
- }
+ output_files.each do |file_name, expected_contents|
+ File.exist?(File.join(@temp_directory_path, file_name)).should be true
+ File.readlines(file_name).map(&:to_i).sort.should == expected_contents.sort
+ end
- output_files.each do |file_name, expected_contents|
- File.exist?(File.join(@temp_directory_path, file_name)).should be_true
- File.readlines(file_name).map(&:to_i).sort.should == expected_contents.sort
- end
+ # Terminate the master
+ Process.kill("TERM", master_pid).should == 1
- # Terminate the master
- Process.kill("TERM", master_pid).should == 1
-
- # Make sure everything is dead
- ([master_pid] + worker_pids).each do |pid|
- expect { Process.kill(0, pid) }.to raise_error(Errno::ESRCH)
+ # Make sure everything is dead after a second
+ sleep 1
+ ([master_pid] + worker_pids).each do |pid|
+ expect { Process.kill(0, pid) }.to raise_error(Errno::ESRCH)
+ end
+ rescue Exception => e
+ Process.kill("TERM", master_pid) if master_pid
+ raise
end
end
end
end
end