lib/hako/schedulers/ecs.rb in hako-0.18.1 vs lib/hako/schedulers/ecs.rb in hako-0.19.0

- old
+ new

@@ -1,16 +1,20 @@ # frozen_string_literal: true require 'aws-sdk' require 'hako' +require 'hako/error' require 'hako/scheduler' require 'hako/schedulers/ecs_definition_comparator' require 'hako/schedulers/ecs_elb' require 'hako/schedulers/ecs_autoscaling' module Hako module Schedulers class Ecs < Scheduler + class NoTasksStarted < Error + end + DEFAULT_CLUSTER = 'default' DEFAULT_FRONT_PORT = 10000 attr_reader :task @@ -25,10 +29,11 @@ if options.key?('autoscaling') @autoscaling = EcsAutoscaling.new(options.fetch('autoscaling'), dry_run: @dry_run) end @started_at = nil @container_instance_arn = nil + @autoscaling_group_for_oneshot = options.fetch('autoscaling_group_for_oneshot', nil) end # @param [Hash<String, Container>] containers # @return [nil] def deploy(containers) @@ -448,13 +453,25 @@ ) result.failures.each do |failure| Hako.logger.error("#{failure.arn} #{failure.reason}") end if result.tasks.empty? - raise 'No tasks started' + raise NoTasksStarted.new('No tasks started') end result.tasks[0] + rescue Aws::ECS::Errors::InvalidParameterException => e + if e.message == 'No Container Instances were found in your cluster.' && on_no_tasks_started(task_definition) + retry + else + raise e + end + rescue NoTasksStarted => e + if on_no_tasks_started(task_definition) + retry + else + raise e + end end # @return [Fixnum] def wait_for_oneshot_finish containers = wait_for_task(@task) @@ -640,9 +657,67 @@ end end end raise "Unable to find rollback target. #{task_definition.task_definition_arn} is INACTIVE?" + end + + # @param [Aws::ECS::Types::TaskDefinition] task_definition + # @return [Boolean] true if the capacity is reserved + def on_no_tasks_started(task_definition) + unless @autoscaling_group_for_oneshot + return false + end + + autoscaling = Aws::AutoScaling::Client.new + loop do + asg = autoscaling.describe_auto_scaling_groups(auto_scaling_group_names: [@autoscaling_group_for_oneshot]).auto_scaling_groups[0] + unless asg + raise Error.new("AutoScaling Group '#{@autoscaling_group_for_oneshot}' does not exist") + end + + container_instances = ecs_client.list_container_instances(cluster: @cluster).flat_map { |c| ecs_client.describe_container_instances(cluster: @cluster, container_instances: c.container_instance_arns).container_instances } + if has_capacity?(task_definition, container_instances) + Hako.logger.info("There's remaining capacity. Start retrying...") + return true + end + + # Check autoscaling group health + current = asg.instances.count { |i| i.lifecycle_state == 'InService' } + if asg.desired_capacity != current + Hako.logger.debug("#{asg.auto_scaling_group_name} isn't in desired state. desired_capacity=#{asg.desired_capacity} in-service instances=#{current}") + sleep 1 + next + end + + # Check out-of-service instances + out_instances = asg.instances.map(&:instance_id) + container_instances.each do |ci| + out_instances.delete(ci.ec2_instance_id) + end + unless out_instances.empty? + Hako.logger.debug("There's instances that is running but not registered as container instances: #{out_instances}") + sleep 1 + next + end + + # Scale out + desired = current + 1 + Hako.logger.info("Increment desired_capacity of #{asg.auto_scaling_group_name} from #{current} to #{desired}") + autoscaling.set_desired_capacity(auto_scaling_group_name: asg.auto_scaling_group_name, desired_capacity: desired) + end + end + + # @param [Aws::ECS::Types::TaskDefinition] task_definition + # @param [Array<Aws::ECS::Types::ContainerInstance>] container_instances + # @return [Boolean] + def has_capacity?(task_definition, container_instances) + required_cpu, required_memory = task_definition.container_definitions.inject([0, 0]) { |(cpu, memory), d| [cpu + d.cpu, memory + d.memory] } + container_instances.any? do |ci| + cpu = ci.remaining_resources.find { |r| r.name == 'CPU' }.integer_value + memory = ci.remaining_resources.find { |r| r.name == 'MEMORY' }.integer_value + required_cpu < cpu && required_memory < memory + end end end end end