lib/hako/schedulers/ecs.rb in hako-1.6.2 vs lib/hako/schedulers/ecs.rb in hako-1.7.0

- old
+ new

@@ -36,10 +36,14 @@ @dynamic_port_mapping = options.fetch('dynamic_port_mapping', @ecs_elb_options.nil?) if options.key?('autoscaling') @autoscaling = EcsAutoscaling.new(options.fetch('autoscaling'), dry_run: @dry_run) end @autoscaling_group_for_oneshot = options.fetch('autoscaling_group_for_oneshot', nil) + @autoscaling_topic_for_oneshot = options.fetch('autoscaling_topic_for_oneshot', nil) + if @autoscaling_topic_for_oneshot && !@autoscaling_group_for_oneshot + validation_error!('autoscaling_group_for_oneshot must be set when autoscaling_topic_for_oneshot is set') + end @oneshot_notification_prefix = options.fetch('oneshot_notification_prefix', nil) @deployment_configuration = {} %i[maximum_percent minimum_healthy_percent].each do |key| @deployment_configuration[key] = options.dig('deployment_configuration', key.to_s) end @@ -636,11 +640,11 @@ end end # @param [Aws::ECS::Types::Task] task # @return [Hash<String, Aws::ECS::Types::Container>] - # Experimental: Get stopped container status from S3. + # Get stopped container status from S3. # The advantage is scalability; ecs:DescribeTasks is heavily # rate-limited, but s3:GetObject is much more scalable. # The JSON is supposed to be stored from Amazon ECS Event Stream. # http://docs.aws.amazon.com/AmazonECS/latest/developerguide/cloudwatch_event_stream.html def poll_task_status_from_s3(task) @@ -852,18 +856,45 @@ end raise "Unable to find rollback target. #{task_definition.task_definition_arn} is INACTIVE?" end - MIN_ASG_INTERVAL = 1 - MAX_ASG_INTERVAL = 120 # @param [Aws::ECS::Types::TaskDefinition] task_definition # @return [Boolean] true if the capacity is reserved def on_no_tasks_started(task_definition) unless @autoscaling_group_for_oneshot return false end + if @autoscaling_topic_for_oneshot + try_scale_out_with_sns(task_definition) + else + try_scale_out_with_as(task_definition) + end + end + + RUN_TASK_INTERVAL = 10 + def try_scale_out_with_sns(task_definition) + required_cpu, required_memory = task_definition.container_definitions.inject([0, 0]) { |(cpu, memory), d| [cpu + d.cpu, memory + d.memory] } + @hako_task_id ||= SecureRandom.uuid + message = JSON.dump( + group_name: @autoscaling_group_for_oneshot, + cluster: @cluster, + cpu: required_cpu, + memory: required_memory, + hako_task_id: @hako_task_id, + ) + Hako.logger.info("Unable to start tasks. Publish message to #{@autoscaling_topic_for_oneshot}: #{message}") + sns_client = Aws::SNS::Client.new + resp = sns_client.publish(topic_arn: @autoscaling_topic_for_oneshot, message: message) + Hako.logger.info("Sent message_id=#{resp.message_id}") + sleep(RUN_TASK_INTERVAL) + true + end + + MIN_ASG_INTERVAL = 1 + MAX_ASG_INTERVAL = 120 + def try_scale_out_with_as(task_definition) autoscaling = Aws::AutoScaling::Client.new interval = MIN_ASG_INTERVAL Hako.logger.info("Unable to start tasks. Start trying scaling out '#{@autoscaling_group_for_oneshot}'") loop do begin