lib/floe/workflow.rb in floe-0.8.0 vs lib/floe/workflow.rb in floe-0.9.0
- old
+ new
@@ -14,25 +14,76 @@
name ||= path_or_io.respond_to?(:read) ? "stream" : path_or_io.split("/").last.split(".").first
new(payload, context, credentials, name)
end
- def wait(workflows, timeout: 5)
+ def wait(workflows, timeout: nil, &block)
+ workflows = [workflows] if workflows.kind_of?(self)
logger.info("checking #{workflows.count} workflows...")
- start = Time.now.utc
- ready = []
+ run_until = Time.now.utc + timeout if timeout.to_i > 0
+ ready = []
+ queue = Queue.new
+ wait_thread = Thread.new do
+ loop do
+ Runner.for_resource("docker").wait do |event, runner_context|
+ queue.push([event, runner_context])
+ end
+ end
+ end
loop do
ready = workflows.select(&:step_nonblock_ready?)
- break if timeout.zero? || Time.now.utc - start > timeout || !ready.empty?
+ break if block.nil? && !ready.empty?
- sleep(1)
+ ready.each(&block)
+
+ # Break if all workflows are completed or we've exceeded the
+ # requested timeout
+ break if workflows.all?(&:end?)
+ break if timeout && (timeout.zero? || Time.now.utc > run_until)
+
+ # Find the earliest time that we should wakeup if no container events
+ # are caught, either a workflow in a Wait or Retry state or we've
+ # exceeded the requested timeout
+ wait_until = workflows.map(&:wait_until)
+ .unshift(run_until)
+ .compact
+ .min
+
+ # If a workflow is in a waiting state wakeup the main thread when
+ # it will be done sleeping
+ if wait_until
+ sleep_thread = Thread.new do
+ sleep_duration = wait_until - Time.now.utc
+ sleep sleep_duration if sleep_duration > 0
+ queue.push(nil)
+ end
+ end
+
+ loop do
+ # Block until an event is raised
+ event, runner_context = queue.pop
+ break if event.nil?
+
+ # If the event is for one of our workflows set the updated runner_context
+ workflows.each do |workflow|
+ next unless workflow.context.state.dig("RunnerContext", "container_ref") == runner_context["container_ref"]
+
+ workflow.context.state["RunnerContext"] = runner_context
+ end
+
+ break if queue.empty?
+ end
+ ensure
+ sleep_thread&.kill
end
logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready")
ready
+ ensure
+ wait_thread&.kill
end
end
attr_reader :context, :credentials, :payload, :states, :states_by_name, :start_at, :name
@@ -72,15 +123,23 @@
step_next
current_state.run_nonblock!
end
- def step_nonblock_wait(timeout: 5)
+ def step_nonblock_wait(timeout: nil)
current_state.wait(:timeout => timeout)
end
def step_nonblock_ready?
current_state.ready?
+ end
+
+ def waiting?
+ current_state.waiting?
+ end
+
+ def wait_until
+ current_state.wait_until
end
def status
context.status
end