lib/hako/schedulers/ecs.rb in hako-1.2.1 vs lib/hako/schedulers/ecs.rb in hako-1.3.0

- old
+ new

@@ -36,10 +36,11 @@ @dynamic_port_mapping = options.fetch('dynamic_port_mapping', @ecs_elb_options.nil?) if options.key?('autoscaling') @autoscaling = EcsAutoscaling.new(options.fetch('autoscaling'), dry_run: @dry_run) end @autoscaling_group_for_oneshot = options.fetch('autoscaling_group_for_oneshot', nil) + @oneshot_notification_prefix = options.fetch('oneshot_notification_prefix', nil) @deployment_configuration = {} %i[maximum_percent minimum_healthy_percent].each do |key| @deployment_configuration[key] = options.dig('deployment_configuration', key.to_s) end @placement_constraints = options.fetch('placement_constraints', []) @@ -539,15 +540,25 @@ end end exit_code end + # @param [Aws::ECS::Types::Task] task + # @return [Hash<String, Aws::ECS::Types::Container>] + def wait_for_task(task) + if @oneshot_notification_prefix + poll_task_status_from_s3(task) + else + poll_task_status_from_ecs(task) + end + end + MIN_WAIT_TASK_INTERVAL = 1 MAX_WAIT_TASK_INTERVAL = 120 # @param [Aws::ECS::Types::Task] task - # @return [nil] - def wait_for_task(task) + # @return [Hash<String, Aws::ECS::Types::Container>] + def poll_task_status_from_ecs(task) task_arn = task.task_arn interval = 1 loop do begin task = ecs_client.describe_tasks(cluster: @cluster, tasks: [task_arn]).tasks[0] @@ -586,9 +597,58 @@ return containers end interval = [interval / 2, MIN_WAIT_TASK_INTERVAL].max Hako.logger.debug("Waiting task with interval=#{interval}") sleep interval + end + end + + # @param [Aws::ECS::Types::Task] task + # @return [Hash<String, Aws::ECS::Types::Container>] + # Experimental: Get stopped container status from S3. + # The advantage is scalability; ecs:DescribeTasks is heavily + # rate-limited, but s3:GetObject is much more scalable. + # The JSON is supposed to be stored from Amazon ECS Event Stream. + # http://docs.aws.amazon.com/AmazonECS/latest/developerguide/cloudwatch_event_stream.html + def poll_task_status_from_s3(task) + s3 = Aws::S3::Client.new + task_arn = task.task_arn + uri = URI.parse(@oneshot_notification_prefix) + prefix = uri.path.sub(%r{\A/}, '') + started_key = "#{prefix}/#{task_arn}/started.json" + stopped_key = "#{prefix}/#{task_arn}/stopped.json" + + loop do + unless @started_at + begin + object = s3.get_object(bucket: uri.host, key: started_key) + rescue Aws::S3::Errors::NoSuchKey + Hako.logger.debug(" s3://#{uri.host}/#{started_key} doesn't exist") + else + json = JSON.parse(object.body.read) + @started_at = Time.parse(json['detail']['startedAt']) + if @started_at + Hako.logger.info "Started at #{@started_at}" + end + end + end + + begin + object = s3.get_object(bucket: uri.host, key: stopped_key) + rescue Aws::S3::Errors::NoSuchKey + Hako.logger.debug(" s3://#{uri.host}/#{stopped_key} doesn't exist") + else + json = JSON.parse(object.body.read) + task = Aws::Json::Parser.new(Aws::ECS::Client.api.operation('describe_tasks').output.shape.member(:tasks).shape.member).parse(json['detail'].to_json) + Hako.logger.info "Stopped at #{task.stopped_at} (reason: #{task.stopped_reason})" + containers = {} + task.containers.each do |c| + containers[c.name] = c + end + return containers + end + + sleep 1 end end # @param [String] container_instance_arn # @return [nil]