Sha256: a43f075c2782837eb0e38f08b767218982b952e4971770ffb422f4ea6a6aa650
Contents?: true
Size: 1.35 KB
Versions: 18
Compression:
Stored size: 1.35 KB
Contents
module Dynflow module Testing class InThreadExecutor < Dynflow::Executors::Abstract def initialize(world) @world = world @director = Director.new(@world) @work_items = Queue.new end def execute(execution_plan_id, finished = Concurrent.future, _wait_for_acceptance = true) feed_queue(@director.start_execution(execution_plan_id, finished)) process_work_items finished end def process_work_items until @work_items.empty? feed_queue(handle_work(@work_items.pop)) clock_tick end end def handle_work(work_item) work_item.execute @director.work_finished(work_item) end def event(execution_plan_id, step_id, event, future = Concurrent.future) event = (Director::Event[execution_plan_id, step_id, event, future]) @director.handle_event(event).each do |work_item| @work_items << work_item end future end def clock_tick @world.clock.progress_all([:periodic_check_inbox]) end def feed_queue(work_items) work_items.each { |work_item| @work_items.push(work_item) } end def terminate(future = Concurrent.future) @director.terminate future.success true rescue => e future.fail e end end end end
Version data entries
18 entries across 18 versions & 1 rubygems