spec/rflow_spec.rb in rflow-0.0.5 vs spec/rflow_spec.rb in rflow-1.0.0a1
- old
+ new
@@ -1,100 +1,323 @@
require 'spec_helper.rb'
+require 'open3'
require 'rflow'
describe RFlow do
- before(:each) do
- @fixture_directory_path = File.join(File.dirname(__FILE__), 'fixtures')
+
+ before(:all) do
+ @extensions_file_name = File.join(File.dirname(__FILE__), 'fixtures', 'extensions_ints.rb')
end
-
- describe 'logger' do
- it "should initialize correctly" do
- log_file_path = File.join(@temp_directory_path, 'logfile')
- RFlow.initialize_logger log_file_path
+ context "when executing from the test script" do
- File.exist?(log_file_path).should_not be_nil
-
- RFlow.logger.error "TESTTESTTEST"
- File.read(log_file_path).should match(/TESTTESTTEST/)
-
- RFlow.close_log_file
+ before(:all) do
+ load @extensions_file_name
end
- it "should reopen correctly" do
- log_file_path = File.join(@temp_directory_path, 'logfile')
- moved_path = log_file_path + '.old'
-
- RFlow.initialize_logger log_file_path
- File.exist?(log_file_path).should be_true
- File.exist?(moved_path).should be_false
+ describe '.run' do
+ before(:each) do
+ @original_directory_path = Dir.getwd
+ @run_directory_path = File.join(@temp_directory_path, 'run')
+ @log_directory_path = File.join(@temp_directory_path, 'log')
+ Dir.mkdir @run_directory_path
+ Dir.mkdir @log_directory_path
+ end
- File.rename log_file_path, moved_path
-
- RFlow.reopen_log_file
+ after(:each) do
+ Dir.chdir @original_directory_path
+ end
- RFlow.logger.error "TESTTESTTEST"
- File.read(log_file_path).should match(/TESTTESTTEST/)
- File.read(moved_path).should_not match(/TESTTESTTEST/)
-
- RFlow.close_log_file
- end
+ def run_rflow_with_dsl(&block)
+ rflow_thread = Thread.new do
+ ActiveRecord::Base.establish_connection adapter: "sqlite3", database: ":memory:"
+ RFlow::Configuration.migrate_database
+ RFlow::Configuration::RubyDSL.configure do |c|
+ block.call(c)
+ end
- it "should toggle log level" do
+ RFlow::Configuration.merge_defaults!
+
+ RFlow.run nil, false
+ end
+
+ # TODO: figure out a way to get rid of this sleep, as there
+ # should be a better way to figure out when RFlow is done
+ sleep(2)
+
+ # Shut down the reactor and the thread
+ EM.run { EM.stop }
+ rflow_thread.join
+ end
+
+
+ it "should run a non-sharded workflow" do
+
+ run_rflow_with_dsl do |c|
+ c.setting('rflow.log_level', 'DEBUG')
+ c.setting('rflow.application_directory_path', @temp_directory_path)
+ c.setting('rflow.application_name', 'nonsharded_test')
+
+ c.component 'generate_ints', 'RFlow::Components::GenerateIntegerSequence', 'start' => 20, 'finish' => 30
+ c.component 'output', 'RFlow::Components::FileOutput', 'output_file_path' => 'out'
+ c.component 'output2', 'RFlow::Components::FileOutput', 'output_file_path' => 'out2'
+ c.component 'output_even', 'RFlow::Components::FileOutput', 'output_file_path' => 'out_even'
+ c.component 'output_odd', 'RFlow::Components::FileOutput', 'output_file_path' => 'out_odd'
+ c.component 'output_even_odd', 'RFlow::Components::FileOutput', 'output_file_path' => 'out_even_odd'
+ c.component 'generate_ints2', 'RFlow::Components::GenerateIntegerSequence', 'start' => 20, 'finish' => 30
+ c.component 'output_even_odd2', 'RFlow::Components::FileOutput', 'output_file_path' => 'out_even_odd2'
+
+ c.connect 'generate_ints#out' => 'output#in'
+ c.connect 'generate_ints#out' => 'output2#in'
+ c.connect 'generate_ints#even_odd_out[even]' => 'output_even#in'
+ c.connect 'generate_ints#even_odd_out[odd]' => 'output_odd#in'
+ c.connect 'generate_ints#even_odd_out' => 'output_even_odd#in'
+ c.connect 'generate_ints2#even_odd_out' => 'output_even_odd2#in'
+ end
+
+ RFlow.master.shards.count.should == 1
+ RFlow.master.shards.first.workers.count.should == 1
+
+ output_files = {
+ 'out' => [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
+ 'out2' => [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
+ 'out_even' => [20, 22, 24, 26, 28, 30],
+ 'out_odd' => [21, 23, 25, 27, 29],
+ 'out_even_odd' => [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
+ 'out_even_odd2' => [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30],
+ }
+
+ output_files.each do |file_name, expected_contents|
+ File.exist?(File.join(@temp_directory_path, file_name)).should be_true
+ File.readlines(file_name).map(&:to_i).should == expected_contents
+ end
+ end
+
+
+ it "should run a sharded workflow" do
+ run_rflow_with_dsl do |c|
+ c.setting('rflow.log_level', 'DEBUG')
+ c.setting('rflow.application_directory_path', @temp_directory_path)
+ c.setting('rflow.application_name', 'sharded_test')
+
+ # Instantiate components
+ c.shard 's1', :process => 3 do |s|
+ s.component 'generate_ints1', 'RFlow::Components::GenerateIntegerSequence', 'start' => 0, 'finish' => 10, 'step' => 3
+ end
+
+ c.shard 's2', :type => :process, :count => 2 do |s|
+ s.component 'generate_ints2', 'RFlow::Components::GenerateIntegerSequence', 'start' => 20, 'finish' => 30
+ end
+
+ c.component 'generate_ints3', 'RFlow::Components::GenerateIntegerSequence', 'start' => 100, 'finish' => 105
+
+ c.shard 's3', :process => 2 do |s|
+ s.component 'output1', 'RFlow::Components::FileOutput', 'output_file_path' => 'out1'
+ s.component 'output2', 'RFlow::Components::FileOutput', 'output_file_path' => 'out2'
+ end
+
+ c.component 'output3', 'RFlow::Components::FileOutput', 'output_file_path' => 'out3'
+ c.component 'output_all', 'RFlow::Components::FileOutput', 'output_file_path' => 'out_all'
+
+ # Hook components together
+ c.connect 'generate_ints1#out' => 'output1#in'
+ c.connect 'generate_ints2#out' => 'output2#in'
+ c.connect 'generate_ints3#out' => 'output3#in'
+ c.connect 'generate_ints1#out' => 'output_all#in'
+ c.connect 'generate_ints2#out' => 'output_all#in'
+ c.connect 'generate_ints3#out' => 'output_all#in'
+ end
+
+ RFlow.master.shards.count.should == 4
+ RFlow.master.shards.map(&:count).should == [1, 3, 2, 2]
+ RFlow.master.shards.map(&:workers).map(&:count).should == [1, 3, 2, 2]
+
+ output_files = {
+ 'out1' => [0, 3, 6, 9] * 3,
+ 'out2' => (20..30).to_a * 2,
+ 'out3' => (100..105).to_a,
+ 'out_all' => [0, 3, 6, 9] * 3 + (20..30).to_a * 2 + (100..105).to_a
+ }
+
+ output_files.each do |file_name, expected_contents|
+ File.exist?(File.join(@temp_directory_path, file_name)).should be_true
+ File.readlines(file_name).map(&:to_i).sort.should == expected_contents.sort
+ end
+ end
end
end
- describe '.run' do
+ context "when executing via the rflow binary" do
before(:each) do
+ @original_directory_path = Dir.getwd
@run_directory_path = File.join(@temp_directory_path, 'run')
@log_directory_path = File.join(@temp_directory_path, 'log')
Dir.mkdir @run_directory_path
Dir.mkdir @log_directory_path
+ Dir.chdir @temp_directory_path
end
- it "should startup and run correctly with non-trivial workflow" do
- config_file_path = File.join(@fixture_directory_path, 'config_ints.rb')
- extensions_path = File.join(@fixture_directory_path, 'extensions_ints.rb')
- config_database_path = File.join(@temp_directory_path, 'config.sqlite')
+ after(:each) do
+ Dir.chdir @original_directory_path
+ end
- # Load the new database with the fixtured config file
- RFlow::Configuration::initialize_database(config_database_path, config_file_path)
- File.exist?(config_database_path).should be_true
+ def execute_rflow(rflow_args)
+ r = {}
+ r[:stdout], r[:stderr], r[:status] = Open3.capture3("bundle exec rflow #{rflow_args}")
+ r
+ end
- # Load the fixtured extensions
- load extensions_path
+ context "with a simple ruby DSL config file" do
+ before(:each) do
+ @config_file_name = 'input_config'
+ File.open('input_config', 'w+') do |file|
+ file.write <<-EOF
+ RFlow::Configuration::RubyDSL.configure do |c|
+ c.setting 'mysetting', 'myvalue'
+ end
+ EOF
+ end
+ end
- # Startup RFlow in its own thread
- rflow_thread = Thread.new do
- RFlow.run config_database_path, false
+ it "should load a ruby dsl file into a sqlite DB" do
+ db_file_name = 'outdb'
+
+ r = execute_rflow("load -d #{db_file_name} -c #{@config_file_name}")
+
+ # Make sure that the process execution worked
+ r[:status].exitstatus.should == 0
+ r[:stderr].should == ''
+ r[:stdout].should match /Successfully initialized database.*#{db_file_name}/
+
+ # Make sure the config actually got loaded
+ ActiveRecord::Base.establish_connection adapter: "sqlite3", database: db_file_name
+ RFlow::Configuration::Setting.where(:name => 'mysetting').first.value.should == 'myvalue'
end
- # TODO: figure out a way to get rid of this sleep, as there
- # should be a better way
- sleep(5)
+ it "should not load a database if the database file already exists" do
+ db_file_name = 'outdb'
+ File.open(db_file_name, 'w') { |file| file.write 'boom' }
- all_file_path = File.join(@temp_directory_path, 'out')
- all2_file_path = File.join(@temp_directory_path, 'out2')
- even_file_path = File.join(@temp_directory_path, 'out_even')
- odd_file_path = File.join(@temp_directory_path, 'out_odd')
- even_odd_file_path = File.join(@temp_directory_path, 'out_even_odd')
- even_odd2_file_path = File.join(@temp_directory_path, 'out_even_odd2')
-
- File.exist?(all_file_path).should be_true
- File.exist?(all2_file_path).should be_true
- File.exist?(even_file_path).should be_true
- File.exist?(odd_file_path).should be_true
- File.exist?(even_odd_file_path).should be_true
- File.exist?(even_odd2_file_path).should be_true
+ r = execute_rflow("load -d #{db_file_name} -c #{@config_file_name}")
- File.readlines(all_file_path).map(&:to_i).should == [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
- File.readlines(all2_file_path).map(&:to_i).should == [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
- File.readlines(even_file_path).map(&:to_i).should == [20, 22, 24, 26, 28, 30]
- File.readlines(odd_file_path).map(&:to_i).should == [21, 23, 25, 27, 29]
- File.readlines(even_odd_file_path).map(&:to_i).should == [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
- File.readlines(even_odd2_file_path).map(&:to_i).should == [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
+ # Make sure that the process execution worked
+ r[:status].exitstatus.should == 1
+ r[:stderr].should == ''
+ r[:stdout].should match /Config database.*#{db_file_name}.*exists/
+ end
+
end
- end
-
+ context "with a complex, sharded ruby DSL config file" do
+ before(:each) do
+ @config_file_name = 'input_config'
+ @db_file_name = 'config_db'
+ @app_name = 'sharded_bin_test'
+ File.open(@config_file_name, 'w+') do |file|
+ file.write <<-EOF
+ RFlow::Configuration::RubyDSL.configure do |c|
+ c.setting('rflow.log_level', 'INFO')
+ c.setting('rflow.application_directory_path', '#{@temp_directory_path}')
+ c.setting('rflow.application_name', '#{@app_name}')
+ # Instantiate components
+ c.shard 's1', :process => 3 do |s|
+ s.component 'generate_ints1', 'RFlow::Components::GenerateIntegerSequence', 'start' => 0, 'finish' => 10, 'step' => 3
+ end
+ c.shard 's2', :type => :process, :count => 2 do |s|
+ s.component 'generate_ints2', 'RFlow::Components::GenerateIntegerSequence', 'start' => 20, 'finish' => 30
+ end
+ c.component 'generate_ints3', 'RFlow::Components::GenerateIntegerSequence', 'start' => 100, 'finish' => 105
+ c.shard 's3', :process => 2 do |s|
+ s.component 'output1', 'RFlow::Components::FileOutput', 'output_file_path' => 'out1'
+ s.component 'output2', 'RFlow::Components::FileOutput', 'output_file_path' => 'out2'
+ end
+ c.component 'output3', 'RFlow::Components::FileOutput', 'output_file_path' => 'out3'
+ c.component 'output_all', 'RFlow::Components::FileOutput', 'output_file_path' => 'out_all'
+ # Hook components together
+ c.connect 'generate_ints1#out' => 'output1#in'
+ c.connect 'generate_ints2#out' => 'output2#in'
+ c.connect 'generate_ints3#out' => 'output3#in'
+ c.connect 'generate_ints1#out' => 'output_all#in'
+ c.connect 'generate_ints2#out' => 'output_all#in'
+ c.connect 'generate_ints3#out' => 'output_all#in'
+ end
+ EOF
+ end
+ r = execute_rflow("load -d #{@db_file_name} -c #{@config_file_name}")
+ r[:status].exitstatus.should == 0
+ r[:stderr].should == ''
+ r[:stdout].should match /Successfully initialized database.*#{@db_file_name}/
+ end
+
+ it "should not start if the components aren't loaded" do
+ r = execute_rflow("start -d #{@db_file_name} -f")
+
+ r[:status].exitstatus.should == 1
+ r[:stderr].should == ''
+ r[:stdout].should match /error/i
+ end
+
+ it "should daemonize and run in the background" do
+ r = execute_rflow("start -d #{@db_file_name} -e #{@extensions_file_name}")
+
+ r[:status].exitstatus.should == 0
+ r[:stderr].should == ''
+ r[:stdout].should_not match /error/i
+
+ sleep 1 # give the daemon a chance to finish
+
+ log_contents = File.read("log/#{@app_name}.log").chomp
+ log_lines = log_contents.split("\n")
+
+ puts '++++++++++++++++++++'
+ puts log_contents
+ puts '++++++++++++++++++++'
+
+ # Log file testing
+ log_lines.each { |line| line.should_not match /^ERROR/ }
+ log_lines.each { |line| line.should_not match /^DEBUG/ }
+
+ # Grab all the pids from the log, which seems to be the only
+ # reliable way to get them
+ log_pids = log_lines.map { |line| /\((\d+)\)/.match(line)[1].to_i }.uniq
+
+ initial_pid = r[:status].pid
+ master_pid = File.read("run/#{@app_name}.pid").chomp.to_i
+ worker_pids = log_pids - [initial_pid, master_pid]
+
+ log_pids.should include initial_pid
+ log_pids.should include master_pid
+
+ worker_pids.size.should == 8
+ worker_pids.should_not include 0
+
+ # Process checks
+ expect { Process.kill(0, initial_pid) }.to raise_error(Errno::ESRCH)
+ ([master_pid] + worker_pids).each do |pid|
+ Process.kill(0, pid).should == 1
+ end
+
+ # Output checks
+ output_files = {
+ 'out1' => [0, 3, 6, 9] * 3,
+ 'out2' => (20..30).to_a * 2,
+ 'out3' => (100..105).to_a,
+ 'out_all' => [0, 3, 6, 9] * 3 + (20..30).to_a * 2 + (100..105).to_a
+ }
+
+ output_files.each do |file_name, expected_contents|
+ File.exist?(File.join(@temp_directory_path, file_name)).should be_true
+ File.readlines(file_name).map(&:to_i).sort.should == expected_contents.sort
+ end
+
+ # Terminate the master
+ Process.kill("TERM", master_pid).should == 1
+
+ # Make sure everything is dead
+ ([master_pid] + worker_pids).each do |pid|
+ expect { Process.kill(0, pid) }.to raise_error(Errno::ESRCH)
+ end
+ end
+ end
+ end
end