lib/hako/schedulers/ecs.rb in hako-1.6.2 vs lib/hako/schedulers/ecs.rb in hako-1.7.0
- old
+ new
@@ -36,10 +36,14 @@
@dynamic_port_mapping = options.fetch('dynamic_port_mapping', @ecs_elb_options.nil?)
if options.key?('autoscaling')
@autoscaling = EcsAutoscaling.new(options.fetch('autoscaling'), dry_run: @dry_run)
end
@autoscaling_group_for_oneshot = options.fetch('autoscaling_group_for_oneshot', nil)
+ @autoscaling_topic_for_oneshot = options.fetch('autoscaling_topic_for_oneshot', nil)
+ if @autoscaling_topic_for_oneshot && !@autoscaling_group_for_oneshot
+ validation_error!('autoscaling_group_for_oneshot must be set when autoscaling_topic_for_oneshot is set')
+ end
@oneshot_notification_prefix = options.fetch('oneshot_notification_prefix', nil)
@deployment_configuration = {}
%i[maximum_percent minimum_healthy_percent].each do |key|
@deployment_configuration[key] = options.dig('deployment_configuration', key.to_s)
end
@@ -636,11 +640,11 @@
end
end
# @param [Aws::ECS::Types::Task] task
# @return [Hash<String, Aws::ECS::Types::Container>]
- # Experimental: Get stopped container status from S3.
+ # Get stopped container status from S3.
# The advantage is scalability; ecs:DescribeTasks is heavily
# rate-limited, but s3:GetObject is much more scalable.
# The JSON is supposed to be stored from Amazon ECS Event Stream.
# http://docs.aws.amazon.com/AmazonECS/latest/developerguide/cloudwatch_event_stream.html
def poll_task_status_from_s3(task)
@@ -852,18 +856,45 @@
end
raise "Unable to find rollback target. #{task_definition.task_definition_arn} is INACTIVE?"
end
- MIN_ASG_INTERVAL = 1
- MAX_ASG_INTERVAL = 120
# @param [Aws::ECS::Types::TaskDefinition] task_definition
# @return [Boolean] true if the capacity is reserved
def on_no_tasks_started(task_definition)
unless @autoscaling_group_for_oneshot
return false
end
+ if @autoscaling_topic_for_oneshot
+ try_scale_out_with_sns(task_definition)
+ else
+ try_scale_out_with_as(task_definition)
+ end
+ end
+
+ RUN_TASK_INTERVAL = 10
+ def try_scale_out_with_sns(task_definition)
+ required_cpu, required_memory = task_definition.container_definitions.inject([0, 0]) { |(cpu, memory), d| [cpu + d.cpu, memory + d.memory] }
+ @hako_task_id ||= SecureRandom.uuid
+ message = JSON.dump(
+ group_name: @autoscaling_group_for_oneshot,
+ cluster: @cluster,
+ cpu: required_cpu,
+ memory: required_memory,
+ hako_task_id: @hako_task_id,
+ )
+ Hako.logger.info("Unable to start tasks. Publish message to #{@autoscaling_topic_for_oneshot}: #{message}")
+ sns_client = Aws::SNS::Client.new
+ resp = sns_client.publish(topic_arn: @autoscaling_topic_for_oneshot, message: message)
+ Hako.logger.info("Sent message_id=#{resp.message_id}")
+ sleep(RUN_TASK_INTERVAL)
+ true
+ end
+
+ MIN_ASG_INTERVAL = 1
+ MAX_ASG_INTERVAL = 120
+ def try_scale_out_with_as(task_definition)
autoscaling = Aws::AutoScaling::Client.new
interval = MIN_ASG_INTERVAL
Hako.logger.info("Unable to start tasks. Start trying scaling out '#{@autoscaling_group_for_oneshot}'")
loop do
begin