lib/hako/schedulers/ecs.rb in hako-1.2.1 vs lib/hako/schedulers/ecs.rb in hako-1.3.0
- old
+ new
@@ -36,10 +36,11 @@
@dynamic_port_mapping = options.fetch('dynamic_port_mapping', @ecs_elb_options.nil?)
if options.key?('autoscaling')
@autoscaling = EcsAutoscaling.new(options.fetch('autoscaling'), dry_run: @dry_run)
end
@autoscaling_group_for_oneshot = options.fetch('autoscaling_group_for_oneshot', nil)
+ @oneshot_notification_prefix = options.fetch('oneshot_notification_prefix', nil)
@deployment_configuration = {}
%i[maximum_percent minimum_healthy_percent].each do |key|
@deployment_configuration[key] = options.dig('deployment_configuration', key.to_s)
end
@placement_constraints = options.fetch('placement_constraints', [])
@@ -539,15 +540,25 @@
end
end
exit_code
end
+ # @param [Aws::ECS::Types::Task] task
+ # @return [Hash<String, Aws::ECS::Types::Container>]
+ def wait_for_task(task)
+ if @oneshot_notification_prefix
+ poll_task_status_from_s3(task)
+ else
+ poll_task_status_from_ecs(task)
+ end
+ end
+
MIN_WAIT_TASK_INTERVAL = 1
MAX_WAIT_TASK_INTERVAL = 120
# @param [Aws::ECS::Types::Task] task
- # @return [nil]
- def wait_for_task(task)
+ # @return [Hash<String, Aws::ECS::Types::Container>]
+ def poll_task_status_from_ecs(task)
task_arn = task.task_arn
interval = 1
loop do
begin
task = ecs_client.describe_tasks(cluster: @cluster, tasks: [task_arn]).tasks[0]
@@ -586,9 +597,58 @@
return containers
end
interval = [interval / 2, MIN_WAIT_TASK_INTERVAL].max
Hako.logger.debug("Waiting task with interval=#{interval}")
sleep interval
+ end
+ end
+
+ # @param [Aws::ECS::Types::Task] task
+ # @return [Hash<String, Aws::ECS::Types::Container>]
+ # Experimental: Get stopped container status from S3.
+ # The advantage is scalability; ecs:DescribeTasks is heavily
+ # rate-limited, but s3:GetObject is much more scalable.
+ # The JSON is supposed to be stored from Amazon ECS Event Stream.
+ # http://docs.aws.amazon.com/AmazonECS/latest/developerguide/cloudwatch_event_stream.html
+ def poll_task_status_from_s3(task)
+ s3 = Aws::S3::Client.new
+ task_arn = task.task_arn
+ uri = URI.parse(@oneshot_notification_prefix)
+ prefix = uri.path.sub(%r{\A/}, '')
+ started_key = "#{prefix}/#{task_arn}/started.json"
+ stopped_key = "#{prefix}/#{task_arn}/stopped.json"
+
+ loop do
+ unless @started_at
+ begin
+ object = s3.get_object(bucket: uri.host, key: started_key)
+ rescue Aws::S3::Errors::NoSuchKey
+ Hako.logger.debug(" s3://#{uri.host}/#{started_key} doesn't exist")
+ else
+ json = JSON.parse(object.body.read)
+ @started_at = Time.parse(json['detail']['startedAt'])
+ if @started_at
+ Hako.logger.info "Started at #{@started_at}"
+ end
+ end
+ end
+
+ begin
+ object = s3.get_object(bucket: uri.host, key: stopped_key)
+ rescue Aws::S3::Errors::NoSuchKey
+ Hako.logger.debug(" s3://#{uri.host}/#{stopped_key} doesn't exist")
+ else
+ json = JSON.parse(object.body.read)
+ task = Aws::Json::Parser.new(Aws::ECS::Client.api.operation('describe_tasks').output.shape.member(:tasks).shape.member).parse(json['detail'].to_json)
+ Hako.logger.info "Stopped at #{task.stopped_at} (reason: #{task.stopped_reason})"
+ containers = {}
+ task.containers.each do |c|
+ containers[c.name] = c
+ end
+ return containers
+ end
+
+ sleep 1
end
end
# @param [String] container_instance_arn
# @return [nil]