lib/hako/schedulers/ecs.rb in hako-0.13.3 vs lib/hako/schedulers/ecs.rb in hako-0.14.0

- old
+ new

@@ -11,23 +11,29 @@ DEFAULT_CLUSTER = 'default' DEFAULT_FRONT_PORT = 10000 attr_reader :task + # @param [Hash<String, Object>] options def configure(options) @cluster = options.fetch('cluster', DEFAULT_CLUSTER) - @desired_count = options.fetch('desired_count') { validation_error!('desired_count must be set') } + @desired_count = options.fetch('desired_count', nil) region = options.fetch('region') { validation_error!('region must be set') } @role = options.fetch('role', nil) @ecs = Aws::ECS::Client.new(region: region) @elb = EcsElb.new(@app_id, Aws::ElasticLoadBalancing::Client.new(region: region), options.fetch('elb', nil)) @ec2 = Aws::EC2::Client.new(region: region) @started_at = nil @container_instance_arn = nil end + # @param [Hash<String, Container>] containers + # @return [nil] def deploy(containers) + unless @desired_count + validation_error!('desired_count must be set') + end front_port = determine_front_port @scripts.each { |script| script.deploy_started(containers, front_port) } definitions = create_definitions(containers) if @dry_run @@ -51,10 +57,14 @@ end Hako.logger.info 'Deployment completed' end end + # @param [Hash<String, Container>] containers + # @param [Array<String>] commands + # @param [Hash<String, String>] env + # @return [nil] def oneshot(containers, commands, env) definitions = create_definitions(containers) definitions.each do |definition| definition.delete(:essential) end @@ -81,18 +91,20 @@ @scripts.each { |script| script.oneshot_started(self) } wait_for_oneshot_finish end end + # @return [nil] def stop_oneshot if @task Hako.logger.warn "Stopping #{@task.task_arn}" @ecs.stop_task(cluster: @cluster, task: @task.task_arn, reason: 'Stopped by hako stop_oneshot') wait_for_oneshot_finish end end + # @return [nil] def status service = describe_service unless service puts 'Unavailable' exit 1 @@ -147,10 +159,11 @@ service.events.first(10).each do |e| puts " #{e.created_at}: #{e.message}" end end + # @return [nil] def remove service = describe_service if service @ecs.delete_service(cluster: @cluster, service: @app_id) Hako.logger.info "#{service.service_arn} is deleted" @@ -161,17 +174,19 @@ @elb.destroy end private + # @return [Aws::ECS::Types::Service, nil] def describe_service service = @ecs.describe_services(cluster: @cluster, services: [@app_id]).services[0] if service && service.status != 'INACTIVE' service end end + # @return [Fixnum] def determine_front_port if @dry_run return DEFAULT_FRONT_PORT end service = describe_service @@ -180,10 +195,11 @@ else new_front_port end end + # @return [Fixnum] def new_front_port max_port = -1 @ecs.list_services(cluster: @cluster).each do |page| unless page.service_arns.empty? @ecs.describe_services(cluster: @cluster, services: page.service_arns).services.each do |s| @@ -201,10 +217,12 @@ else max_port + 1 end end + # @param [Aws::ECS::Types::Service] service + # @return [Fixnum, nil] def find_front_port(service) task_definition = @ecs.describe_task_definition(task_definition: service.task_definition).task_definition container_definitions = {} task_definition.container_definitions.each do |c| container_definitions[c.name] = c @@ -212,10 +230,13 @@ if container_definitions['front'] container_definitions['front'].port_mappings[0].host_port end end + # @param [String] family + # @param [Array<Hash>] definitions + # @return [Boolean] def task_definition_changed?(family, definitions) if @force return true end task_definition = @ecs.describe_task_definition(task_definition: family).task_definition @@ -234,10 +255,12 @@ rescue Aws::ECS::Errors::ClientException # Task definition does not exist true end + # @param [Hash<String, Hash<String, String>>] actual_volumes + # @return [Boolean] def different_volumes?(actual_volumes) if @volumes.size != actual_volumes.size return true end actual_volumes.each do |actual_volume| @@ -251,14 +274,19 @@ end false end + # @param [Hash] expected_container + # @param [Aws::ECS::Types::ContainerDefinition] actual_container + # @return [Boolean] def different_definition?(expected_container, actual_container) EcsDefinitionComparator.new(expected_container).different?(actual_container) end + # @param [Array<Hash>] definitions + # @return [Aws::ECS::Types::TaskDefinition, Symbol] def register_task_definition(definitions) if task_definition_changed?(@app_id, definitions) @ecs.register_task_definition( family: @app_id, container_definitions: definitions, @@ -267,16 +295,20 @@ else :noop end end + # @param [Hash<String, Container>] containers + # @return [nil] def create_definitions(containers) containers.map do |name, container| create_definition(name, container) end end + # @param [Array<Hash>] definitions + # @return [Aws::ECS::Types::TaskDefinition, Symbol] def register_task_definition_for_oneshot(definitions) family = "#{@app_id}-oneshot" if task_definition_changed?(family, definitions) @ecs.register_task_definition( family: "#{@app_id}-oneshot", @@ -286,19 +318,23 @@ else :noop end end + # @return [Hash] def volumes_definition @volumes.map do |name, volume| { name: name, host: { source_path: volume['source_path'] }, } end end + # @param [String] name + # @param [Container] container + # @return [Hash] def create_definition(name, container) environment = container.env.map { |k, v| { name: k, value: v } } { name: name, image: container.image_tag, @@ -311,10 +347,14 @@ docker_labels: container.docker_labels, mount_points: container.mount_points, } end + # @param [Aws::ECS::Types::TaskDefinition] task_definition + # @param [Array<String>] commands + # @param [Hash<String, String>] env + # @return [Aws::ECS::Types::Task] def run_task(task_definition, commands, env) environment = env.map { |k, v| { name: k, value: v } } @ecs.run_task( cluster: @cluster, task_definition: task_definition.task_definition_arn, @@ -330,10 +370,11 @@ count: 1, started_by: 'hako oneshot', ).tasks[0] end + # @return [Fixnum] def wait_for_oneshot_finish containers = wait_for_task(@task) @task = nil Hako.logger.info 'Oneshot task finished' exit_code = 127 @@ -348,10 +389,12 @@ end end exit_code end + # @param [Aws::ECS::Types::Task] task + # @return [nil] def wait_for_task(task) task_arn = task.task_arn loop do task = @ecs.describe_tasks(cluster: @cluster, tasks: [task_arn]).tasks[0] if task.nil? @@ -383,10 +426,12 @@ end sleep 1 end end + # @param [String] container_instance_arn + # @return [nil] def report_container_instance(container_instance_arn) container_instance = @ecs.describe_container_instances(cluster: @cluster, container_instances: [container_instance_arn]).container_instances[0] @ec2.describe_tags(filters: [{ name: 'resource-id', values: [container_instance.ec2_instance_id] }]).each do |page| tag = page.tags.find { |t| t.key == 'Name' } if tag @@ -395,10 +440,13 @@ Hako.logger.info "Container instance is #{container_instance_arn} (#{container_instance.ec2_instance_id})" end end end + # @param [String] task_definition_arn + # @param [Fixnum] front_port + # @return [Aws::ECS::Types::Service, Symbol] def create_or_update_service(task_definition_arn, front_port) service = describe_service if service.nil? params = { cluster: @cluster, @@ -433,38 +481,47 @@ end end SERVICE_KEYS = %i[desired_count task_definition].freeze + # @param [Aws::ECS::Types::Service] service + # @param [Hash] params + # @return [Boolean] def service_changed?(service, params) SERVICE_KEYS.each do |key| if service.public_send(key) != params[key] return true end end false end + # @param [Aws::ECS::Types::Service] service + # @return [nil] def wait_for_ready(service) latest_event_id = find_latest_event_id(service.events) + Hako.logger.debug " latest_event_id=#{latest_event_id}" loop do s = @ecs.describe_services(cluster: service.cluster_arn, services: [service.service_arn]).services[0] s.events.each do |e| if e.id == latest_event_id break end Hako.logger.info "#{e.created_at}: #{e.message}" end latest_event_id = find_latest_event_id(s.events) finished = s.deployments.all? { |d| d.status != 'ACTIVE' } + Hako.logger.debug " latest_event_id=#{latest_event_id}, deployment statuses=#{s.deployments.map(&:status)}" if finished return else sleep 1 end end end + # @param [Array<Aws::ECS::Types::ServiceEvent>] events + # @return [String, nil] def find_latest_event_id(events) if events.empty? nil else events[0].id