test/functional/concurrent_base.rb in ruote-2.1.9 vs test/functional/concurrent_base.rb in ruote-2.1.10
- old
+ new
@@ -8,46 +8,52 @@
require File.join(File.dirname(__FILE__), 'base.rb')
class Ruote::Worker
- def step_by_one
- msg = @storage.get_msgs.first
- #p [ msg['action'], msg['fei'] ]
- if msg
- process(msg)
- else
- false
- end
+ public :process
+end
+
+class Ruote::Engine
+
+ def peek_msg
+ @msgs = @context.storage.get_msgs if ( ! @msgs) || @msgs.size < 1
+ @msgs.shift
end
- public :process
+ def do_process (msg)
+ @context.worker.process(msg)
+ end
- def step_until (&block)
+ def step (count)
+ return if count == 0
loop do
- msg = @storage.get_msgs.first
- return msg if block.call(msg)
- process(msg)
+ m = next_msg
+ next unless m
+ do_process(m)
+ break
end
+ step(count - 1)
end
-end
-class Ruote::Engine
- def step (count=1)
- count.times { @context.worker.step_by_one }
+ def next_msg
+ loop do
+ if m = peek_msg
+ return m
+ end
+ end
end
- def step!
- r = @context.worker.step_by_one
- step! if r == false
- end
- def walk
- while @context.worker.step_by_one do; end
- end
- def do_step (msg)
- @context.worker.process(msg)
- end
- def step_until (&block)
- @context.worker.step_until(&block)
+
+ def gather_msgs
+ (1..77).to_a.inject({}) { |h, i|
+ #(i % 10).times { Thread.pass }
+ sleep 0.001
+ m = peek_msg
+ h[m['_id']] = m if m
+ h
+ }.values.sort { |a, b|
+ a['put_at'] <=> b['put_at']
+ }
end
end
module ConcurrentBase