lib/floe/workflow/runner/docker.rb in floe-0.8.0 vs lib/floe/workflow/runner/docker.rb in floe-0.9.0

- old
+ new

@@ -8,10 +8,11 @@ DOCKER_COMMAND = "docker" def initialize(options = {}) require "awesome_spawn" + require "io/wait" require "tempfile" super @network = options.fetch("network", "bridge") @@ -43,14 +44,67 @@ delete_container(container_id) if container_id delete_secret(secrets_file) if secrets_file end + def wait(timeout: nil, events: %i[create update delete], &block) + until_timestamp = Time.now.utc + timeout if timeout + + r, w = IO.pipe + + pid = AwesomeSpawn.run_detached( + self.class::DOCKER_COMMAND, :err => :out, :out => w, :params => wait_params(until_timestamp) + ) + + w.close + + loop do + readable_timeout = until_timestamp - Time.now.utc if until_timestamp + + # Wait for our end of the pipe to be readable and if it didn't timeout + # get the events from stdout + next if r.wait_readable(readable_timeout).nil? + + # Get all events while the pipe is readable + notices = [] + while r.ready? + notice = r.gets + + # If the process has exited `r.gets` returns `nil` and the pipe is + # always `ready?` + break if notice.nil? + + event, runner_context = parse_notice(notice) + next if event.nil? || !events.include?(event) + + notices << [event, runner_context] + end + + # If we're given a block yield the events otherwise return them + if block + notices.each(&block) + else + # Terminate the `docker events` process before returning the events + sigterm(pid) + + return notices + end + + # Check that the `docker events` process is still alive + Process.kill(0, pid) + rescue Errno::ESRCH + # Break out of the loop if the `docker events` process has exited + break + end + ensure + r.close + end + def status!(runner_context) return if runner_context.key?("Error") - runner_context["container_state"] = inspect_container(runner_context["container_ref"]).first&.dig("State") + runner_context["container_state"] = inspect_container(runner_context["container_ref"])&.dig("State") end def running?(runner_context) !!runner_context.dig("container_state", "Running") end @@ -89,12 +143,49 @@ params << [:v, "#{secrets_file}:/run/secrets:z"] if secrets_file params << [:name, container_name(image)] params << image end + def wait_params(until_timestamp) + params = ["events", [:format, "{{json .}}"], [:filter, "type=container"], [:since, Time.now.utc.to_i]] + params << [:until, until_timestamp.to_i] if until_timestamp + params + end + + def parse_notice(notice) + notice = JSON.parse(notice) + + status = notice["status"] + event = docker_event_status_to_event(status) + running = event != :delete + + name, exit_code = notice.dig("Actor", "Attributes")&.values_at("name", "exitCode") + + runner_context = {"container_ref" => name, "container_state" => {"Running" => running, "ExitCode" => exit_code.to_i}} + + [event, runner_context] + rescue JSON::ParserError + [] + end + + def docker_event_status_to_event(status) + case status + when "create" + :create + when "start" + :update + when "die", "destroy" + :delete + else + :unkonwn + end + end + def inspect_container(container_id) - JSON.parse(docker!("inspect", container_id).output) + JSON.parse(docker!("inspect", container_id).output).first + rescue + nil end def delete_container(container_id) docker!("rm", container_id) rescue @@ -112,9 +203,15 @@ def create_secret(secrets) secrets_file = Tempfile.new secrets_file.write(secrets.to_json) secrets_file.close secrets_file.path + end + + def sigterm(pid) + Process.kill("TERM", pid) + rescue Errno::ESRCH + nil end def global_docker_options [] end