lib/floe/workflow/runner/docker.rb in floe-0.8.0 vs lib/floe/workflow/runner/docker.rb in floe-0.9.0
- old
+ new
@@ -8,10 +8,11 @@
DOCKER_COMMAND = "docker"
def initialize(options = {})
require "awesome_spawn"
+ require "io/wait"
require "tempfile"
super
@network = options.fetch("network", "bridge")
@@ -43,14 +44,67 @@
delete_container(container_id) if container_id
delete_secret(secrets_file) if secrets_file
end
+ def wait(timeout: nil, events: %i[create update delete], &block)
+ until_timestamp = Time.now.utc + timeout if timeout
+
+ r, w = IO.pipe
+
+ pid = AwesomeSpawn.run_detached(
+ self.class::DOCKER_COMMAND, :err => :out, :out => w, :params => wait_params(until_timestamp)
+ )
+
+ w.close
+
+ loop do
+ readable_timeout = until_timestamp - Time.now.utc if until_timestamp
+
+ # Wait for our end of the pipe to be readable and if it didn't timeout
+ # get the events from stdout
+ next if r.wait_readable(readable_timeout).nil?
+
+ # Get all events while the pipe is readable
+ notices = []
+ while r.ready?
+ notice = r.gets
+
+ # If the process has exited `r.gets` returns `nil` and the pipe is
+ # always `ready?`
+ break if notice.nil?
+
+ event, runner_context = parse_notice(notice)
+ next if event.nil? || !events.include?(event)
+
+ notices << [event, runner_context]
+ end
+
+ # If we're given a block yield the events otherwise return them
+ if block
+ notices.each(&block)
+ else
+ # Terminate the `docker events` process before returning the events
+ sigterm(pid)
+
+ return notices
+ end
+
+ # Check that the `docker events` process is still alive
+ Process.kill(0, pid)
+ rescue Errno::ESRCH
+ # Break out of the loop if the `docker events` process has exited
+ break
+ end
+ ensure
+ r.close
+ end
+
def status!(runner_context)
return if runner_context.key?("Error")
- runner_context["container_state"] = inspect_container(runner_context["container_ref"]).first&.dig("State")
+ runner_context["container_state"] = inspect_container(runner_context["container_ref"])&.dig("State")
end
def running?(runner_context)
!!runner_context.dig("container_state", "Running")
end
@@ -89,12 +143,49 @@
params << [:v, "#{secrets_file}:/run/secrets:z"] if secrets_file
params << [:name, container_name(image)]
params << image
end
+ def wait_params(until_timestamp)
+ params = ["events", [:format, "{{json .}}"], [:filter, "type=container"], [:since, Time.now.utc.to_i]]
+ params << [:until, until_timestamp.to_i] if until_timestamp
+ params
+ end
+
+ def parse_notice(notice)
+ notice = JSON.parse(notice)
+
+ status = notice["status"]
+ event = docker_event_status_to_event(status)
+ running = event != :delete
+
+ name, exit_code = notice.dig("Actor", "Attributes")&.values_at("name", "exitCode")
+
+ runner_context = {"container_ref" => name, "container_state" => {"Running" => running, "ExitCode" => exit_code.to_i}}
+
+ [event, runner_context]
+ rescue JSON::ParserError
+ []
+ end
+
+ def docker_event_status_to_event(status)
+ case status
+ when "create"
+ :create
+ when "start"
+ :update
+ when "die", "destroy"
+ :delete
+ else
+ :unkonwn
+ end
+ end
+
def inspect_container(container_id)
- JSON.parse(docker!("inspect", container_id).output)
+ JSON.parse(docker!("inspect", container_id).output).first
+ rescue
+ nil
end
def delete_container(container_id)
docker!("rm", container_id)
rescue
@@ -112,9 +203,15 @@
def create_secret(secrets)
secrets_file = Tempfile.new
secrets_file.write(secrets.to_json)
secrets_file.close
secrets_file.path
+ end
+
+ def sigterm(pid)
+ Process.kill("TERM", pid)
+ rescue Errno::ESRCH
+ nil
end
def global_docker_options
[]
end