test/functional/concurrent_base.rb in ruote-2.1.9 vs test/functional/concurrent_base.rb in ruote-2.1.10

- old
+ new

@@ -8,46 +8,52 @@ require File.join(File.dirname(__FILE__), 'base.rb') class Ruote::Worker - def step_by_one - msg = @storage.get_msgs.first - #p [ msg['action'], msg['fei'] ] - if msg - process(msg) - else - false - end + public :process +end + +class Ruote::Engine + + def peek_msg + @msgs = @context.storage.get_msgs if ( ! @msgs) || @msgs.size < 1 + @msgs.shift end - public :process + def do_process (msg) + @context.worker.process(msg) + end - def step_until (&block) + def step (count) + return if count == 0 loop do - msg = @storage.get_msgs.first - return msg if block.call(msg) - process(msg) + m = next_msg + next unless m + do_process(m) + break end + step(count - 1) end -end -class Ruote::Engine - def step (count=1) - count.times { @context.worker.step_by_one } + def next_msg + loop do + if m = peek_msg + return m + end + end end - def step! - r = @context.worker.step_by_one - step! if r == false - end - def walk - while @context.worker.step_by_one do; end - end - def do_step (msg) - @context.worker.process(msg) - end - def step_until (&block) - @context.worker.step_until(&block) + + def gather_msgs + (1..77).to_a.inject({}) { |h, i| + #(i % 10).times { Thread.pass } + sleep 0.001 + m = peek_msg + h[m['_id']] = m if m + h + }.values.sort { |a, b| + a['put_at'] <=> b['put_at'] + } end end module ConcurrentBase