lib/floe/workflow.rb in floe-0.8.0 vs lib/floe/workflow.rb in floe-0.9.0

- old
+ new

@@ -14,25 +14,76 @@ name ||= path_or_io.respond_to?(:read) ? "stream" : path_or_io.split("/").last.split(".").first new(payload, context, credentials, name) end - def wait(workflows, timeout: 5) + def wait(workflows, timeout: nil, &block) + workflows = [workflows] if workflows.kind_of?(self) logger.info("checking #{workflows.count} workflows...") - start = Time.now.utc - ready = [] + run_until = Time.now.utc + timeout if timeout.to_i > 0 + ready = [] + queue = Queue.new + wait_thread = Thread.new do + loop do + Runner.for_resource("docker").wait do |event, runner_context| + queue.push([event, runner_context]) + end + end + end loop do ready = workflows.select(&:step_nonblock_ready?) - break if timeout.zero? || Time.now.utc - start > timeout || !ready.empty? + break if block.nil? && !ready.empty? - sleep(1) + ready.each(&block) + + # Break if all workflows are completed or we've exceeded the + # requested timeout + break if workflows.all?(&:end?) + break if timeout && (timeout.zero? || Time.now.utc > run_until) + + # Find the earliest time that we should wakeup if no container events + # are caught, either a workflow in a Wait or Retry state or we've + # exceeded the requested timeout + wait_until = workflows.map(&:wait_until) + .unshift(run_until) + .compact + .min + + # If a workflow is in a waiting state wakeup the main thread when + # it will be done sleeping + if wait_until + sleep_thread = Thread.new do + sleep_duration = wait_until - Time.now.utc + sleep sleep_duration if sleep_duration > 0 + queue.push(nil) + end + end + + loop do + # Block until an event is raised + event, runner_context = queue.pop + break if event.nil? + + # If the event is for one of our workflows set the updated runner_context + workflows.each do |workflow| + next unless workflow.context.state.dig("RunnerContext", "container_ref") == runner_context["container_ref"] + + workflow.context.state["RunnerContext"] = runner_context + end + + break if queue.empty? + end + ensure + sleep_thread&.kill end logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready") ready + ensure + wait_thread&.kill end end attr_reader :context, :credentials, :payload, :states, :states_by_name, :start_at, :name @@ -72,15 +123,23 @@ step_next current_state.run_nonblock! end - def step_nonblock_wait(timeout: 5) + def step_nonblock_wait(timeout: nil) current_state.wait(:timeout => timeout) end def step_nonblock_ready? current_state.ready? + end + + def waiting? + current_state.waiting? + end + + def wait_until + current_state.wait_until end def status context.status end