lib/hako/schedulers/ecs.rb in hako-0.14.0 vs lib/hako/schedulers/ecs.rb in hako-0.14.1

- old
+ new

@@ -15,15 +15,13 @@ # @param [Hash<String, Object>] options def configure(options) @cluster = options.fetch('cluster', DEFAULT_CLUSTER) @desired_count = options.fetch('desired_count', nil) - region = options.fetch('region') { validation_error!('region must be set') } + @region = options.fetch('region') { validation_error!('region must be set') } @role = options.fetch('role', nil) - @ecs = Aws::ECS::Client.new(region: region) - @elb = EcsElb.new(@app_id, Aws::ElasticLoadBalancing::Client.new(region: region), options.fetch('elb', nil)) - @ec2 = Aws::EC2::Client.new(region: region) + @ecs_elb_options = options.fetch('elb', nil) @started_at = nil @container_instance_arn = nil end # @param [Hash<String, Container>] containers @@ -42,11 +40,11 @@ end else task_definition = register_task_definition(definitions) if task_definition == :noop Hako.logger.info "Task definition isn't changed" - task_definition = @ecs.describe_task_definition(task_definition: @app_id).task_definition + task_definition = ecs_client.describe_task_definition(task_definition: @app_id).task_definition else Hako.logger.info "Registered task definition: #{task_definition.task_definition_arn}" end service = create_or_update_service(task_definition.task_definition_arn, front_port) if service == :noop @@ -80,11 +78,11 @@ 0 else task_definition = register_task_definition_for_oneshot(definitions) if task_definition == :noop Hako.logger.info "Task definition isn't changed" - task_definition = @ecs.describe_task_definition(task_definition: "#{@app_id}-oneshot").task_definition + task_definition = ecs_client.describe_task_definition(task_definition: "#{@app_id}-oneshot").task_definition else Hako.logger.info "Registered task definition: #{task_definition.task_definition_arn}" end @task = run_task(task_definition, commands, env) Hako.logger.info "Started task: #{@task.task_arn}" @@ -95,11 +93,11 @@ # @return [nil] def stop_oneshot if @task Hako.logger.warn "Stopping #{@task.task_arn}" - @ecs.stop_task(cluster: @cluster, task: @task.task_arn, reason: 'Stopped by hako stop_oneshot') + ecs_client.stop_task(cluster: @cluster, task: @task.task_arn, reason: 'Stopped by hako stop_oneshot') wait_for_oneshot_finish end end # @return [nil] @@ -110,11 +108,11 @@ exit 1 end unless service.load_balancers.empty? lb = service.load_balancers[0] - lb_detail = @elb.describe_load_balancer + lb_detail = ecs_elb_client.describe_load_balancer puts 'Load balancer:' lb_detail.listener_descriptions.each do |ld| l = ld.listener puts " #{lb_detail.dns_name}:#{l.load_balancer_port} -> #{lb.container_name}:#{lb.container_port}" end @@ -125,19 +123,19 @@ abbrev_task_definition = d.task_definition.slice(%r{task-definition/(.+)\z}, 1) puts " [#{d.status}] #{abbrev_task_definition} desired_count=#{d.desired_count}, pending_count=#{d.pending_count}, running_count=#{d.running_count}" end puts 'Tasks:' - @ecs.list_tasks(cluster: @cluster, service_name: service.service_arn).each do |page| + ecs_client.list_tasks(cluster: @cluster, service_name: service.service_arn).each do |page| unless page.task_arns.empty? - tasks = @ecs.describe_tasks(cluster: @cluster, tasks: page.task_arns).tasks + tasks = ecs_client.describe_tasks(cluster: @cluster, tasks: page.task_arns).tasks container_instances = {} - @ecs.describe_container_instances(cluster: @cluster, container_instances: tasks.map(&:container_instance_arn)).container_instances.each do |ci| + ecs_client.describe_container_instances(cluster: @cluster, container_instances: tasks.map(&:container_instance_arn)).container_instances.each do |ci| container_instances[ci.container_instance_arn] = ci end ec2_instances = {} - @ec2.describe_instances(instance_ids: container_instances.values.map(&:ec2_instance_id)).reservations.each do |r| + ec2_client.describe_instances(instance_ids: container_instances.values.map(&:ec2_instance_id)).reservations.each do |r| r.instances.each do |i| ec2_instances[i.instance_id] = i end end tasks.each do |task| @@ -163,24 +161,39 @@ # @return [nil] def remove service = describe_service if service - @ecs.delete_service(cluster: @cluster, service: @app_id) + ecs_client.delete_service(cluster: @cluster, service: @app_id) Hako.logger.info "#{service.service_arn} is deleted" else puts "Service #{@app_id} doesn't exist" end - @elb.destroy + ecs_elb_client.destroy end private + # @return [Aws::ECS::Client] + def ecs_client + @ecs_client ||= Aws::ECS::Client.new(region: @region) + end + + # @return [Aws::EC2::Client] + def ec2_client + @ec2_client ||= Aws::EC2::Client.new(region: @region) + end + + # @return [EcsElb] + def ecs_elb_client + @ecs_elb_client ||= EcsElb.new(@app_id, Aws::ElasticLoadBalancing::Client.new(region: @region), @ecs_elb_options) + end + # @return [Aws::ECS::Types::Service, nil] def describe_service - service = @ecs.describe_services(cluster: @cluster, services: [@app_id]).services[0] + service = ecs_client.describe_services(cluster: @cluster, services: [@app_id]).services[0] if service && service.status != 'INACTIVE' service end end @@ -198,13 +211,13 @@ end # @return [Fixnum] def new_front_port max_port = -1 - @ecs.list_services(cluster: @cluster).each do |page| + ecs_client.list_services(cluster: @cluster).each do |page| unless page.service_arns.empty? - @ecs.describe_services(cluster: @cluster, services: page.service_arns).services.each do |s| + ecs_client.describe_services(cluster: @cluster, services: page.service_arns).services.each do |s| if s.status != 'INACTIVE' port = find_front_port(s) if port max_port = [max_port, port].max end @@ -220,11 +233,11 @@ end # @param [Aws::ECS::Types::Service] service # @return [Fixnum, nil] def find_front_port(service) - task_definition = @ecs.describe_task_definition(task_definition: service.task_definition).task_definition + task_definition = ecs_client.describe_task_definition(task_definition: service.task_definition).task_definition container_definitions = {} task_definition.container_definitions.each do |c| container_definitions[c.name] = c end if container_definitions['front'] @@ -237,11 +250,11 @@ # @return [Boolean] def task_definition_changed?(family, definitions) if @force return true end - task_definition = @ecs.describe_task_definition(task_definition: family).task_definition + task_definition = ecs_client.describe_task_definition(task_definition: family).task_definition container_definitions = {} task_definition.container_definitions.each do |c| container_definitions[c.name] = c end @@ -285,11 +298,11 @@ # @param [Array<Hash>] definitions # @return [Aws::ECS::Types::TaskDefinition, Symbol] def register_task_definition(definitions) if task_definition_changed?(@app_id, definitions) - @ecs.register_task_definition( + ecs_client.register_task_definition( family: @app_id, container_definitions: definitions, volumes: volumes_definition, ).task_definition else @@ -308,11 +321,11 @@ # @param [Array<Hash>] definitions # @return [Aws::ECS::Types::TaskDefinition, Symbol] def register_task_definition_for_oneshot(definitions) family = "#{@app_id}-oneshot" if task_definition_changed?(family, definitions) - @ecs.register_task_definition( + ecs_client.register_task_definition( family: "#{@app_id}-oneshot", container_definitions: definitions, volumes: volumes_definition, ).task_definition else @@ -353,11 +366,11 @@ # @param [Array<String>] commands # @param [Hash<String, String>] env # @return [Aws::ECS::Types::Task] def run_task(task_definition, commands, env) environment = env.map { |k, v| { name: k, value: v } } - @ecs.run_task( + ecs_client.run_task( cluster: @cluster, task_definition: task_definition.task_definition_arn, overrides: { container_overrides: [ { @@ -394,11 +407,11 @@ # @param [Aws::ECS::Types::Task] task # @return [nil] def wait_for_task(task) task_arn = task.task_arn loop do - task = @ecs.describe_tasks(cluster: @cluster, tasks: [task_arn]).tasks[0] + task = ecs_client.describe_tasks(cluster: @cluster, tasks: [task_arn]).tasks[0] if task.nil? Hako.logger.debug "Task #{task_arn} could not be described" sleep 1 next end @@ -429,12 +442,12 @@ end # @param [String] container_instance_arn # @return [nil] def report_container_instance(container_instance_arn) - container_instance = @ecs.describe_container_instances(cluster: @cluster, container_instances: [container_instance_arn]).container_instances[0] - @ec2.describe_tags(filters: [{ name: 'resource-id', values: [container_instance.ec2_instance_id] }]).each do |page| + container_instance = ecs_client.describe_container_instances(cluster: @cluster, container_instances: [container_instance_arn]).container_instances[0] + ec2_client.describe_tags(filters: [{ name: 'resource-id', values: [container_instance.ec2_instance_id] }]).each do |page| tag = page.tags.find { |t| t.key == 'Name' } if tag Hako.logger.info "Container instance is #{container_instance_arn} (#{tag.value} #{container_instance.ec2_instance_id})" else Hako.logger.info "Container instance is #{container_instance_arn} (#{container_instance.ec2_instance_id})" @@ -453,30 +466,30 @@ service_name: @app_id, task_definition: task_definition_arn, desired_count: @desired_count, role: @role, } - name = @elb.find_or_create_load_balancer(front_port) + name = ecs_elb_client.find_or_create_load_balancer(front_port) if name params[:load_balancers] = [ { load_balancer_name: name, container_name: 'front', container_port: 80, }, ] end - @ecs.create_service(params).service + ecs_client.create_service(params).service else params = { cluster: @cluster, service: @app_id, desired_count: @desired_count, task_definition: task_definition_arn, } if service_changed?(service, params) - @ecs.update_service(params).service + ecs_client.update_service(params).service else :noop end end end @@ -499,20 +512,22 @@ # @return [nil] def wait_for_ready(service) latest_event_id = find_latest_event_id(service.events) Hako.logger.debug " latest_event_id=#{latest_event_id}" loop do - s = @ecs.describe_services(cluster: service.cluster_arn, services: [service.service_arn]).services[0] + s = ecs_client.describe_services(cluster: service.cluster_arn, services: [service.service_arn]).services[0] s.events.each do |e| if e.id == latest_event_id break end Hako.logger.info "#{e.created_at}: #{e.message}" end latest_event_id = find_latest_event_id(s.events) - finished = s.deployments.all? { |d| d.status != 'ACTIVE' } - Hako.logger.debug " latest_event_id=#{latest_event_id}, deployment statuses=#{s.deployments.map(&:status)}" - if finished + Hako.logger.debug " latest_event_id=#{latest_event_id}, deployments=#{s.deployments}" + no_active = s.deployments.all? { |d| d.status != 'ACTIVE' } + primary = s.deployments.find { |d| d.status == 'PRIMARY' } + primary_ready = primary && primary.running_count == primary.desired_count + if no_active && primary_ready return else sleep 1 end end