Sha256: b10c5e305fe6b577f06cc437702aad6e0b4a4612d7e36c4b777b2b1bb2c0e5eb
Contents?: true
Size: 1.89 KB
Versions: 2
Compression:
Stored size: 1.89 KB
Contents
# # testing ruote-beanstalk # # Mon Jun 14 16:11:02 JST 2010 # require File.join(File.dirname(__FILE__), 'base') class FtParticipantTest < Test::Unit::TestCase def setup @bs_pid = Ruote::Beanstalk.fork( :address => '127.0.0.1', :port => 11300, :no_kill_at_exit => true, :quiet => true) sleep 0.100 @engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new())) end def teardown Process.kill(9, @bs_pid) if @bs_pid end class Watcher attr_reader :jobs def initialize(port, tube=nil) @connection = ::Beanstalk::Connection.new("127.0.0.1:#{port}", tube) @jobs = [] @thread = Thread.new do begin loop do job = @connection.reserve job.delete @jobs << Rufus::Json.decode(job.body) end rescue Exception => e #p e end end end end def test_participant @engine.register_participant( :alpha, Ruote::Beanstalk::ParticipantProxy, 'beanstalk' => '127.0.0.1:11300') watcher = Watcher.new(11300) #@engine.context.logger.noisy = true wfid = @engine.launch(Ruote.define do alpha end) @engine.wait_for(:alpha) sleep 0.100 assert_equal 1, watcher.jobs.size assert_equal 'workitem', watcher.jobs.first.first end def test_participant_tube @engine.register_participant( :alpha, Ruote::Beanstalk::ParticipantProxy, 'beanstalk' => '127.0.0.1:11300', 'tube' => 'underground') watcher0 = Watcher.new(11300) watcher1 = Watcher.new(11300, 'underground') #@engine.context.logger.noisy = true wfid = @engine.launch(Ruote.define do alpha end) @engine.wait_for(:alpha) sleep 0.100 assert_equal 0, watcher0.jobs.size assert_equal 1, watcher1.jobs.size assert_equal 'workitem', watcher1.jobs.first.first end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
ruote-beanstalk-2.2.0 | test/functional/ft_0_participant.rb |
ruote-beanstalk-2.1.11 | test/functional/ft_0_participant.rb |