lib/command/run.rb in cpl-1.4.0 vs lib/command/run.rb in cpl-2.2.0
- old
+ new
@@ -1,145 +1,573 @@
# frozen_string_literal: true
module Command
- class Run < Base
+ class Run < Base # rubocop:disable Metrics/ClassLength
+ INTERACTIVE_COMMANDS = [
+ "bash",
+ "rails console",
+ "rails c",
+ "rails dbconsole",
+ "rails db"
+ ].freeze
+
NAME = "run"
USAGE = "run COMMAND"
REQUIRES_ARGS = true
DEFAULT_ARGS = ["bash"].freeze
OPTIONS = [
app_option(required: true),
image_option,
+ log_method_option,
workload_option,
location_option,
use_local_token_option,
- terminal_size_option
+ terminal_size_option,
+ interactive_option,
+ detached_option,
+ cpu_option,
+ memory_option,
+ entrypoint_option
].freeze
- DESCRIPTION = "Runs one-off **_interactive_** replicas (analog of `heroku run`)"
+ DESCRIPTION = "Runs one-off interactive or non-interactive replicas (analog of `heroku run`)"
LONG_DESCRIPTION = <<~DESC
- - Runs one-off **_interactive_** replicas (analog of `heroku run`)
- - Uses `Standard` workload type and `cpln exec` as the execution method, with CLI streaming
- - If `fix_terminal_size` is `true` in the `.controlplane/controlplane.yml` file, the remote terminal size will be fixed to match the local terminal size (may also be overriden through `--terminal-size`)
-
- > **IMPORTANT:** Useful for development where it's needed for interaction, and where network connection drops and
- > task crashing are tolerable. For production tasks, it's better to use `cpl run:detached`.
+ - Runs one-off interactive or non-interactive replicas (analog of `heroku run`)
+ - Uses `Cron` workload type and either:
+ - - `cpln workload exec` for interactive mode, with CLI streaming
+ - - log async fetching for non-interactive mode
+ - The Dockerfile entrypoint is used as the command by default, which assumes `exec "${@}"` to be present,
+ and the args ["bash", "-c", cmd_to_run] are passed
+ - The entrypoint can be overridden through `--entrypoint`, which must be a single command or a script path that exists in the container,
+ and the args ["bash", "-c", cmd_to_run] are passed,
+ unless the entrypoint is `bash`, in which case the args ["-c", cmd_to_run] are passed
+ - Providing `--entrypoint none` sets the entrypoint to `bash` by default
+ - If `fix_terminal_size` is `true` in the `.controlplane/controlplane.yml` file,
+ the remote terminal size will be fixed to match the local terminal size (may also be overridden through `--terminal-size`)
+ - By default, all jobs use a CPU size of 1 (1 core) and a memory size of 2Gi (2 gibibytes)
+ (can be configured through `runner_job_default_cpu` and `runner_job_default_memory` in `controlplane.yml`,
+ and also overridden per job through `--cpu` and `--memory`)
+ - By default, the job is stopped if it takes longer than 6 hours to finish
+ (can be configured though `runner_job_timeout` in `controlplane.yml`)
DESC
EXAMPLES = <<~EX
```sh
# Opens shell (bash by default).
cpl run -a $APP_NAME
- # Need to quote COMMAND if setting ENV value or passing args.
- cpl run -a $APP_NAME -- 'LOG_LEVEL=warn rails db:migrate'
+ # Runs interactive command, keeps shell open, and stops job when exiting.
+ cpl run -a $APP_NAME --interactive -- rails c
- # Runs command, displays output, and exits shell.
- cpl run -a $APP_NAME -- ls /
- cpl run -a $APP_NAME -- rails db:migrate:status
+ # Some commands are automatically detected as interactive, so no need to pass `--interactive`.
+ #{INTERACTIVE_COMMANDS.map { |cmd| "cpl run -a $APP_NAME -- #{cmd}" }.join("\n ")}
- # Runs command and keeps shell open.
- cpl run -a $APP_NAME -- rails c
+ # Runs non-interactive command, outputs logs, exits with the exit code of the command and stops job.
+ cpl run -a $APP_NAME -- rails db:migrate
+ # Runs non-iteractive command, detaches, exits with 0, and prints commands to:
+ # - see logs from the job
+ # - stop the job
+ cpl run -a $APP_NAME --detached -- rails db:migrate
+
+ # The command needs to be quoted if setting an env variable or passing args.
+ cpl run -a $APP_NAME -- 'SOME_ENV_VAR=some_value rails db:migrate'
+
# Uses a different image (which may not be promoted yet).
cpl run -a $APP_NAME --image appimage:123 -- rails db:migrate # Exact image name
cpl run -a $APP_NAME --image latest -- rails db:migrate # Latest sequential image
# Uses a different workload than `one_off_workload` from `.controlplane/controlplane.yml`.
cpl run -a $APP_NAME -w other-workload -- bash
# Overrides remote CPLN_TOKEN env variable with local token.
# Useful when superuser rights are needed in remote container.
cpl run -a $APP_NAME --use-local-token -- bash
+
+ # Replaces the existing Dockerfile entrypoint with `bash`.
+ cpl run -a $APP_NAME --entrypoint none -- rails db:migrate
+
+ # Replaces the existing Dockerfile entrypoint.
+ cpl run -a $APP_NAME --entrypoint /app/alternative-entrypoint.sh -- rails db:migrate
```
EX
- attr_reader :location, :workload_to_clone, :workload_clone, :container
+ DEFAULT_JOB_CPU = "1"
+ DEFAULT_JOB_MEMORY = "2Gi"
+ DEFAULT_JOB_TIMEOUT = 21_600 # 6 hours
+ DEFAULT_JOB_HISTORY_LIMIT = 10
+ MAGIC_END = "---cpl run command finished---"
- def call # rubocop:disable Metrics/MethodLength
+ attr_reader :interactive, :detached, :location, :original_workload, :runner_workload,
+ :default_image, :default_cpu, :default_memory, :job_timeout, :job_history_limit,
+ :container, :expected_deployed_version, :job, :replica, :command
+
+ def call # rubocop:disable Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
+ @interactive = config.options[:interactive] || interactive_command?
+ @detached = config.options[:detached]
+ @log_method = config.options[:log_method]
+
@location = config.location
- @workload_to_clone = config.options["workload"] || config[:one_off_workload]
- @workload_clone = "#{workload_to_clone}-run-#{random_four_digits}"
+ @original_workload = config.options[:workload] || config[:one_off_workload]
+ @runner_workload = "#{original_workload}-runner"
+ @default_image = "#{config.app}:#{Controlplane::NO_IMAGE_AVAILABLE}"
+ @default_cpu = config.current[:runner_job_default_cpu] || DEFAULT_JOB_CPU
+ @default_memory = config.current[:runner_job_default_memory] || DEFAULT_JOB_MEMORY
+ @job_timeout = config.current[:runner_job_timeout] || DEFAULT_JOB_TIMEOUT
+ @job_history_limit = DEFAULT_JOB_HISTORY_LIMIT
- step("Cloning workload '#{workload_to_clone}' on app '#{config.options[:app]}' to '#{workload_clone}'") do
- clone_workload
+ unless interactive
+ @internal_sigint = false
+
+ # Catch Ctrl+C in the main process
+ trap("SIGINT") do
+ unless @internal_sigint
+ print_detached_commands
+ exit(ExitCode::INTERRUPT)
+ end
+ end
end
- wait_for_workload(workload_clone)
- wait_for_replica(workload_clone, location)
- run_in_replica
- ensure
+ create_runner_workload if cp.fetch_workload(runner_workload).nil?
+ wait_for_runner_workload_deploy
+ update_runner_workload
+ wait_for_runner_workload_update if expected_deployed_version
+
+ start_job
+ wait_for_replica_for_job
+
progress.puts
- ensure_workload_deleted(workload_clone)
+ if interactive
+ run_interactive
+ else
+ run_non_interactive
+ end
end
private
- def clone_workload # rubocop:disable Metrics/MethodLength
- # Create a base copy of workload props
- spec = cp.fetch_workload!(workload_to_clone).fetch("spec")
- container_spec = spec["containers"].detect { _1["name"] == workload_to_clone } || spec["containers"].first
- @container = container_spec["name"]
+ def interactive_command?
+ INTERACTIVE_COMMANDS.include?(args_join(config.args))
+ end
- # remove other containers if any
- spec["containers"] = [container_spec]
+ def app_workload_replica_args
+ ["-a", config.app, "--workload", runner_workload, "--replica", replica]
+ end
- # Stub workload command with dummy server that just responds to port
- # Needed to avoid execution of ENTRYPOINT and CMD of Dockerfile
- container_spec["command"] = "ruby"
- container_spec["args"] = ["-e", Scripts.http_dummy_server_ruby]
+ def create_runner_workload # rubocop:disable Metrics/MethodLength
+ step("Creating runner workload '#{runner_workload}' based on '#{original_workload}'") do
+ spec, container_spec = base_workload_specs(original_workload)
- # Ensure one-off workload will be running
- spec["defaultOptions"]["suspend"] = false
+ # Remove other containers if any
+ spec["containers"] = [container_spec]
- # Ensure no scaling
- spec["defaultOptions"]["autoscaling"]["minScale"] = 1
- spec["defaultOptions"]["autoscaling"]["maxScale"] = 1
- spec["defaultOptions"]["capacityAI"] = false
+ # Default to using existing Dockerfile entrypoint
+ container_spec.delete("command")
- # Override image if specified
- image = config.options[:image]
- image = latest_image if image == "latest"
- container_spec["image"] = "/org/#{config.org}/image/#{image}" if image
+ # Remove props that conflict with job
+ container_spec.delete("ports")
+ container_spec.delete("lifecycle")
+ container_spec.delete("livenessProbe")
+ container_spec.delete("readinessProbe")
- # Set runner
- container_spec["env"] ||= []
- container_spec["env"] << { "name" => "CONTROLPLANE_RUNNER", "value" => runner_script }
+ # Set image, CPU, and memory to default values
+ container_spec["image"] = default_image
+ container_spec["cpu"] = default_cpu
+ container_spec["memory"] = default_memory
- if config.options["use_local_token"]
- container_spec["env"] << { "name" => "CONTROLPLANE_TOKEN",
- "value" => ControlplaneApiDirect.new.api_token[:token] }
+ # Ensure cron workload won't run per schedule
+ spec["defaultOptions"]["suspend"] = true
+
+ # Ensure no scaling
+ spec["defaultOptions"]["autoscaling"] = {}
+ spec["defaultOptions"]["capacityAI"] = false
+
+ # Set cron job props
+ spec["type"] = "cron"
+ spec["job"] = {
+ # Next job set to run on January 1st, 2029
+ "schedule" => "0 0 1 1 1",
+
+ "restartPolicy" => "Never",
+ "activeDeadlineSeconds" => job_timeout,
+ "historyLimit" => job_history_limit
+ }
+
+ # Create runner workload
+ cp.apply_hash("kind" => "workload", "name" => runner_workload, "spec" => spec)
end
+ end
- # Create workload clone
- cp.apply_hash("kind" => "workload", "name" => workload_clone, "spec" => spec)
+ def update_runner_workload # rubocop:disable Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
+ should_update = false
+ spec = nil
+
+ step("Checking if runner workload '#{runner_workload}' needs to be updated") do # rubocop:disable Metrics/BlockLength
+ _, original_container_spec = base_workload_specs(original_workload)
+ spec, container_spec = base_workload_specs(runner_workload)
+
+ # Keep ENV synced between original and runner workloads
+ original_env_str = original_container_spec["env"]&.sort_by { |env| env["name"] }.to_s
+ env_str = container_spec["env"]&.sort_by { |env| env["name"] }.to_s
+ if original_env_str != env_str
+ container_spec["env"] = original_container_spec["env"]
+ should_update = true
+ end
+
+ if container_spec["image"] != default_image
+ container_spec["image"] = default_image
+ should_update = true
+ end
+
+ if container_spec["cpu"] != default_cpu
+ container_spec["cpu"] = default_cpu
+ should_update = true
+ end
+
+ if container_spec["memory"] != default_memory
+ container_spec["memory"] = default_memory
+ should_update = true
+ end
+
+ if spec["job"]["activeDeadlineSeconds"] != job_timeout
+ spec["job"]["activeDeadlineSeconds"] = job_timeout
+ should_update = true
+ end
+
+ if spec["job"]["historyLimit"] != job_history_limit
+ spec["job"]["historyLimit"] = job_history_limit
+ should_update = true
+ end
+
+ true
+ end
+
+ return unless should_update
+
+ step("Updating runner workload '#{runner_workload}'") do
+ # Update runner workload
+ @expected_deployed_version = cp.cron_workload_deployed_version(runner_workload) + 1
+ cp.apply_hash("kind" => "workload", "name" => runner_workload, "spec" => spec)
+ end
end
- def runner_script # rubocop:disable Metrics/MethodLength
- script = Scripts.helpers_cleanup
+ def wait_for_runner_workload_deploy
+ step("Waiting for runner workload '#{runner_workload}' to be deployed", retry_on_failure: true) do
+ !cp.cron_workload_deployed_version(runner_workload).nil?
+ end
+ end
- if config.options["use_local_token"]
- script += <<~SHELL
- CPLN_TOKEN=$CONTROLPLANE_TOKEN
- unset CONTROLPLANE_TOKEN
- SHELL
+ def wait_for_runner_workload_update
+ step("Waiting for runner workload '#{runner_workload}' to be updated", retry_on_failure: true) do
+ cp.cron_workload_deployed_version(runner_workload) >= expected_deployed_version
end
+ end
+ def start_job
+ job_start_yaml = build_job_start_yaml
+
+ step("Starting job for runner workload '#{runner_workload}'", retry_on_failure: true) do
+ result = cp.start_cron_workload(runner_workload, job_start_yaml, location: location)
+ @job = result&.dig("items", 0, "id")
+
+ job || false
+ end
+ end
+
+ def wait_for_replica_for_job
+ step("Waiting for replica to start, which runs job '#{job}'", retry_on_failure: true) do
+ result = cp.fetch_workload_replicas(runner_workload, location: location)
+ @replica = result["items"].find { |item| item.include?(job) }
+
+ replica || false
+ end
+ end
+
+ def run_interactive
+ progress.puts("Connecting to replica '#{replica}'...\n\n")
+ cp.workload_exec(runner_workload, replica, location: location, container: container, command: command)
+ end
+
+ def run_non_interactive
+ if detached
+ print_detached_commands
+ exit(ExitCode::SUCCESS)
+ end
+
+ case @log_method
+ when 1 then run_non_interactive_v1
+ when 2 then run_non_interactive_v2
+ when 3 then run_non_interactive_v3
+ else raise "Invalid log method: #{@log_method}"
+ end
+ end
+
+ def run_non_interactive_v1 # rubocop:disable Metrics/MethodLength
+ logs_pid = Process.fork do
+ # Catch Ctrl+C in the forked process
+ trap("SIGINT") do
+ exit(ExitCode::SUCCESS)
+ end
+
+ Cpl::Cli.start(["logs", *app_workload_replica_args])
+ end
+ Process.detach(logs_pid)
+
+ exit_status = wait_for_job_status
+
+ # We need to wait a bit for the logs to appear,
+ # otherwise it may exit without showing them
+ Kernel.sleep(30)
+
+ @internal_sigint = true
+ Process.kill("INT", logs_pid)
+ exit(exit_status)
+ end
+
+ def run_non_interactive_v2
+ current_cpl = File.expand_path("cpl", "#{__dir__}/../..")
+ logs_pipe = IO.popen([current_cpl, "logs", *app_workload_replica_args])
+
+ exit_status = wait_for_job_status_and_log(logs_pipe)
+
+ @internal_sigint = true
+ Process.kill("INT", logs_pipe.pid)
+ exit(exit_status)
+ end
+
+ def run_non_interactive_v3
+ exit(show_logs_waiting)
+ end
+
+ def base_workload_specs(workload)
+ spec = cp.fetch_workload!(workload).fetch("spec")
+ container_spec = spec["containers"].detect { _1["name"] == original_workload } || spec["containers"].first
+
+ [spec, container_spec]
+ end
+
+ def build_job_start_yaml # rubocop:disable Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
+ _, original_container_spec = base_workload_specs(original_workload)
+ @container = original_container_spec["name"]
+
+ job_start_hash = { "name" => container }
+
+ if config.options[:use_local_token]
+ job_start_hash["env"] ||= []
+ job_start_hash["env"].push({ "name" => "CPL_TOKEN", "value" => ControlplaneApiDirect.new.api_token[:token] })
+ end
+
+ entrypoint = nil
+ if config.options[:entrypoint]
+ entrypoint = config.options[:entrypoint] == "none" ? "bash" : config.options[:entrypoint]
+ end
+
+ job_start_hash["command"] = entrypoint if entrypoint
+ job_start_hash["args"] ||= []
+ job_start_hash["args"].push("bash") unless entrypoint == "bash"
+ job_start_hash["args"].push("-c")
+ job_start_hash["env"] ||= []
+ job_start_hash["env"].push({ "name" => "CPL_RUNNER_SCRIPT", "value" => runner_script })
+ if interactive
+ job_start_hash["env"].push({ "name" => "CPL_MONITORING_SCRIPT", "value" => interactive_monitoring_script })
+
+ job_start_hash["args"].push('eval "$CPL_MONITORING_SCRIPT"')
+ @command = %(bash -c 'eval "$CPL_RUNNER_SCRIPT"')
+ else
+ job_start_hash["args"].push('eval "$CPL_RUNNER_SCRIPT"')
+ end
+
+ image = config.options[:image]
+ image_link = if image
+ image = cp.latest_image if image == "latest"
+ "/org/#{config.org}/image/#{image}"
+ else
+ original_container_spec["image"]
+ end
+
+ job_start_hash["image"] = image_link
+ job_start_hash["cpu"] = config.options[:cpu] if config.options[:cpu]
+ job_start_hash["memory"] = config.options[:memory] if config.options[:memory]
+
+ job_start_hash.to_yaml
+ end
+
+ def interactive_monitoring_script
+ <<~SCRIPT
+ primary_pid=""
+
+ check_primary() {
+ if ! kill -0 $primary_pid 2>/dev/null; then
+ echo "Primary process has exited. Shutting down."
+ exit 0
+ fi
+ }
+
+ while true; do
+ if [[ -z "$primary_pid" ]]; then
+ primary_pid=$(ps -eo pid,etime,cmd --sort=etime | grep -v "$$" | grep -v 'ps -eo' | grep -v 'grep' | grep 'CPL_RUNNER_SCRIPT' | head -n 1 | awk '{print $1}')
+ if [[ ! -z "$primary_pid" ]]; then
+ echo "Primary process set with PID: $primary_pid"
+ fi
+ else
+ check_primary
+ fi
+
+ sleep 1
+ done
+ SCRIPT
+ end
+
+ def interactive_runner_script
+ script = ""
+
# NOTE: fixes terminal size to match local terminal
if config.current[:fix_terminal_size] || config.options[:terminal_size]
if config.options[:terminal_size]
rows, cols = config.options[:terminal_size].split(",")
else
+ # NOTE: cannot use `Shell.cmd` here, as `stty size` has to run in a terminal environment
rows, cols = `stty size`.split(/\s+/)
end
- script += "stty rows #{rows}\nstty cols #{cols}\n" if rows && cols
+ script += "stty rows #{rows}\nstty cols #{cols}\n"
end
- script += args_join(config.args)
script
end
- def run_in_replica
- progress.puts("Connecting...\n\n")
- command = %(bash -c 'eval "$CONTROLPLANE_RUNNER"')
- cp.workload_exec(workload_clone, location: location, container: container, command: command)
+ def runner_script # rubocop:disable Metrics/MethodLength
+ script = <<~SCRIPT
+ unset CPL_RUNNER_SCRIPT
+ unset CPL_MONITORING_SCRIPT
+
+ if [ -n "$CPL_TOKEN" ]; then
+ CPLN_TOKEN=$CPL_TOKEN
+ unset CPL_TOKEN
+ fi
+ SCRIPT
+
+ script += interactive_runner_script if interactive
+
+ script +=
+ if @log_method == 1 || @interactive
+ args_join(config.args)
+ else
+ <<~SCRIPT
+ ( #{args_join(config.args)} )
+ CPL_EXIT_CODE=$?
+ echo '#{MAGIC_END}'
+ exit $CPL_EXIT_CODE
+ SCRIPT
+ end
+
+ script
+ end
+
+ def wait_for_job_status
+ Kernel.sleep(1) until (exit_code = resolve_job_status)
+ exit_code
+ end
+
+ def wait_for_job_status_and_log(logs_pipe) # rubocop:disable Metrics/MethodLength
+ no_logs_counter = 0
+
+ loop do
+ no_logs_counter += 1
+ break if no_logs_counter > 60 # 30s
+ break if logs_pipe.eof?
+ next Kernel.sleep(0.5) unless logs_pipe.ready?
+
+ no_logs_counter = 0
+ line = logs_pipe.gets
+ break if line.chomp == MAGIC_END
+
+ puts(line)
+ end
+
+ resolve_job_status
+ end
+
+ def print_detached_commands
+ return unless replica
+
+ app_workload_replica_config = app_workload_replica_args.join(" ")
+ progress.puts(
+ "\n\n" \
+ "- To view logs from the job, run:\n `cpl logs #{app_workload_replica_config}`\n" \
+ "- To stop the job, run:\n `cpl ps:stop #{app_workload_replica_config}`\n"
+ )
+ end
+
+ def resolve_job_status # rubocop:disable Metrics/MethodLength
+ loop do
+ result = cp.fetch_cron_workload(runner_workload, location: location)
+ job_details = result&.dig("items")&.find { |item| item["id"] == job }
+ status = job_details&.dig("status")
+
+ Shell.debug("JOB STATUS", status)
+
+ case status
+ when "active", "pending"
+ sleep 1
+ when "successful"
+ break ExitCode::SUCCESS
+ else
+ break ExitCode::ERROR_DEFAULT
+ end
+ end
+ end
+
+ ###########################################
+ ### temporary extaction from run:detached
+ ###########################################
+ def show_logs_waiting # rubocop:disable Metrics/MethodLength
+ retries = 0
+ begin
+ job_finished_count = 0
+ loop do
+ case print_uniq_logs
+ when :finished
+ break
+ when :changed
+ next
+ else
+ job_finished_count += 1 if resolve_job_status
+ break if job_finished_count > 5
+
+ sleep(1)
+ end
+ end
+
+ resolve_job_status
+ rescue RuntimeError => e
+ raise "#{e} Exiting..." unless retries < 10 # MAX_RETRIES
+
+ progress.puts(Shell.color("ERROR: #{e} Retrying...", :red))
+ retries += 1
+ retry
+ end
+ end
+
+ def print_uniq_logs
+ status = nil
+
+ @printed_log_entries ||= []
+ ts = Time.now.to_i
+ entries = normalized_log_entries(from: ts - 60, to: ts)
+
+ (entries - @printed_log_entries).sort.each do |(_ts, val)|
+ status ||= :changed
+ val.chomp == MAGIC_END ? status = :finished : progress.puts(val)
+ end
+
+ @printed_log_entries = entries # as well truncate old entries if any
+
+ status || :unchanged
+ end
+
+ def normalized_log_entries(from:, to:)
+ log = cp.log_get(workload: runner_workload, from: from, to: to, replica: replica)
+
+ log["data"]["result"]
+ .each_with_object([]) { |obj, result| result.concat(obj["values"]) }
+ .select { |ts, _val| ts[..-10].to_i > from }
end
end
end