lib/hako/schedulers/ecs.rb in hako-0.14.0 vs lib/hako/schedulers/ecs.rb in hako-0.14.1
- old
+ new
@@ -15,15 +15,13 @@
# @param [Hash<String, Object>] options
def configure(options)
@cluster = options.fetch('cluster', DEFAULT_CLUSTER)
@desired_count = options.fetch('desired_count', nil)
- region = options.fetch('region') { validation_error!('region must be set') }
+ @region = options.fetch('region') { validation_error!('region must be set') }
@role = options.fetch('role', nil)
- @ecs = Aws::ECS::Client.new(region: region)
- @elb = EcsElb.new(@app_id, Aws::ElasticLoadBalancing::Client.new(region: region), options.fetch('elb', nil))
- @ec2 = Aws::EC2::Client.new(region: region)
+ @ecs_elb_options = options.fetch('elb', nil)
@started_at = nil
@container_instance_arn = nil
end
# @param [Hash<String, Container>] containers
@@ -42,11 +40,11 @@
end
else
task_definition = register_task_definition(definitions)
if task_definition == :noop
Hako.logger.info "Task definition isn't changed"
- task_definition = @ecs.describe_task_definition(task_definition: @app_id).task_definition
+ task_definition = ecs_client.describe_task_definition(task_definition: @app_id).task_definition
else
Hako.logger.info "Registered task definition: #{task_definition.task_definition_arn}"
end
service = create_or_update_service(task_definition.task_definition_arn, front_port)
if service == :noop
@@ -80,11 +78,11 @@
0
else
task_definition = register_task_definition_for_oneshot(definitions)
if task_definition == :noop
Hako.logger.info "Task definition isn't changed"
- task_definition = @ecs.describe_task_definition(task_definition: "#{@app_id}-oneshot").task_definition
+ task_definition = ecs_client.describe_task_definition(task_definition: "#{@app_id}-oneshot").task_definition
else
Hako.logger.info "Registered task definition: #{task_definition.task_definition_arn}"
end
@task = run_task(task_definition, commands, env)
Hako.logger.info "Started task: #{@task.task_arn}"
@@ -95,11 +93,11 @@
# @return [nil]
def stop_oneshot
if @task
Hako.logger.warn "Stopping #{@task.task_arn}"
- @ecs.stop_task(cluster: @cluster, task: @task.task_arn, reason: 'Stopped by hako stop_oneshot')
+ ecs_client.stop_task(cluster: @cluster, task: @task.task_arn, reason: 'Stopped by hako stop_oneshot')
wait_for_oneshot_finish
end
end
# @return [nil]
@@ -110,11 +108,11 @@
exit 1
end
unless service.load_balancers.empty?
lb = service.load_balancers[0]
- lb_detail = @elb.describe_load_balancer
+ lb_detail = ecs_elb_client.describe_load_balancer
puts 'Load balancer:'
lb_detail.listener_descriptions.each do |ld|
l = ld.listener
puts " #{lb_detail.dns_name}:#{l.load_balancer_port} -> #{lb.container_name}:#{lb.container_port}"
end
@@ -125,19 +123,19 @@
abbrev_task_definition = d.task_definition.slice(%r{task-definition/(.+)\z}, 1)
puts " [#{d.status}] #{abbrev_task_definition} desired_count=#{d.desired_count}, pending_count=#{d.pending_count}, running_count=#{d.running_count}"
end
puts 'Tasks:'
- @ecs.list_tasks(cluster: @cluster, service_name: service.service_arn).each do |page|
+ ecs_client.list_tasks(cluster: @cluster, service_name: service.service_arn).each do |page|
unless page.task_arns.empty?
- tasks = @ecs.describe_tasks(cluster: @cluster, tasks: page.task_arns).tasks
+ tasks = ecs_client.describe_tasks(cluster: @cluster, tasks: page.task_arns).tasks
container_instances = {}
- @ecs.describe_container_instances(cluster: @cluster, container_instances: tasks.map(&:container_instance_arn)).container_instances.each do |ci|
+ ecs_client.describe_container_instances(cluster: @cluster, container_instances: tasks.map(&:container_instance_arn)).container_instances.each do |ci|
container_instances[ci.container_instance_arn] = ci
end
ec2_instances = {}
- @ec2.describe_instances(instance_ids: container_instances.values.map(&:ec2_instance_id)).reservations.each do |r|
+ ec2_client.describe_instances(instance_ids: container_instances.values.map(&:ec2_instance_id)).reservations.each do |r|
r.instances.each do |i|
ec2_instances[i.instance_id] = i
end
end
tasks.each do |task|
@@ -163,24 +161,39 @@
# @return [nil]
def remove
service = describe_service
if service
- @ecs.delete_service(cluster: @cluster, service: @app_id)
+ ecs_client.delete_service(cluster: @cluster, service: @app_id)
Hako.logger.info "#{service.service_arn} is deleted"
else
puts "Service #{@app_id} doesn't exist"
end
- @elb.destroy
+ ecs_elb_client.destroy
end
private
+ # @return [Aws::ECS::Client]
+ def ecs_client
+ @ecs_client ||= Aws::ECS::Client.new(region: @region)
+ end
+
+ # @return [Aws::EC2::Client]
+ def ec2_client
+ @ec2_client ||= Aws::EC2::Client.new(region: @region)
+ end
+
+ # @return [EcsElb]
+ def ecs_elb_client
+ @ecs_elb_client ||= EcsElb.new(@app_id, Aws::ElasticLoadBalancing::Client.new(region: @region), @ecs_elb_options)
+ end
+
# @return [Aws::ECS::Types::Service, nil]
def describe_service
- service = @ecs.describe_services(cluster: @cluster, services: [@app_id]).services[0]
+ service = ecs_client.describe_services(cluster: @cluster, services: [@app_id]).services[0]
if service && service.status != 'INACTIVE'
service
end
end
@@ -198,13 +211,13 @@
end
# @return [Fixnum]
def new_front_port
max_port = -1
- @ecs.list_services(cluster: @cluster).each do |page|
+ ecs_client.list_services(cluster: @cluster).each do |page|
unless page.service_arns.empty?
- @ecs.describe_services(cluster: @cluster, services: page.service_arns).services.each do |s|
+ ecs_client.describe_services(cluster: @cluster, services: page.service_arns).services.each do |s|
if s.status != 'INACTIVE'
port = find_front_port(s)
if port
max_port = [max_port, port].max
end
@@ -220,11 +233,11 @@
end
# @param [Aws::ECS::Types::Service] service
# @return [Fixnum, nil]
def find_front_port(service)
- task_definition = @ecs.describe_task_definition(task_definition: service.task_definition).task_definition
+ task_definition = ecs_client.describe_task_definition(task_definition: service.task_definition).task_definition
container_definitions = {}
task_definition.container_definitions.each do |c|
container_definitions[c.name] = c
end
if container_definitions['front']
@@ -237,11 +250,11 @@
# @return [Boolean]
def task_definition_changed?(family, definitions)
if @force
return true
end
- task_definition = @ecs.describe_task_definition(task_definition: family).task_definition
+ task_definition = ecs_client.describe_task_definition(task_definition: family).task_definition
container_definitions = {}
task_definition.container_definitions.each do |c|
container_definitions[c.name] = c
end
@@ -285,11 +298,11 @@
# @param [Array<Hash>] definitions
# @return [Aws::ECS::Types::TaskDefinition, Symbol]
def register_task_definition(definitions)
if task_definition_changed?(@app_id, definitions)
- @ecs.register_task_definition(
+ ecs_client.register_task_definition(
family: @app_id,
container_definitions: definitions,
volumes: volumes_definition,
).task_definition
else
@@ -308,11 +321,11 @@
# @param [Array<Hash>] definitions
# @return [Aws::ECS::Types::TaskDefinition, Symbol]
def register_task_definition_for_oneshot(definitions)
family = "#{@app_id}-oneshot"
if task_definition_changed?(family, definitions)
- @ecs.register_task_definition(
+ ecs_client.register_task_definition(
family: "#{@app_id}-oneshot",
container_definitions: definitions,
volumes: volumes_definition,
).task_definition
else
@@ -353,11 +366,11 @@
# @param [Array<String>] commands
# @param [Hash<String, String>] env
# @return [Aws::ECS::Types::Task]
def run_task(task_definition, commands, env)
environment = env.map { |k, v| { name: k, value: v } }
- @ecs.run_task(
+ ecs_client.run_task(
cluster: @cluster,
task_definition: task_definition.task_definition_arn,
overrides: {
container_overrides: [
{
@@ -394,11 +407,11 @@
# @param [Aws::ECS::Types::Task] task
# @return [nil]
def wait_for_task(task)
task_arn = task.task_arn
loop do
- task = @ecs.describe_tasks(cluster: @cluster, tasks: [task_arn]).tasks[0]
+ task = ecs_client.describe_tasks(cluster: @cluster, tasks: [task_arn]).tasks[0]
if task.nil?
Hako.logger.debug "Task #{task_arn} could not be described"
sleep 1
next
end
@@ -429,12 +442,12 @@
end
# @param [String] container_instance_arn
# @return [nil]
def report_container_instance(container_instance_arn)
- container_instance = @ecs.describe_container_instances(cluster: @cluster, container_instances: [container_instance_arn]).container_instances[0]
- @ec2.describe_tags(filters: [{ name: 'resource-id', values: [container_instance.ec2_instance_id] }]).each do |page|
+ container_instance = ecs_client.describe_container_instances(cluster: @cluster, container_instances: [container_instance_arn]).container_instances[0]
+ ec2_client.describe_tags(filters: [{ name: 'resource-id', values: [container_instance.ec2_instance_id] }]).each do |page|
tag = page.tags.find { |t| t.key == 'Name' }
if tag
Hako.logger.info "Container instance is #{container_instance_arn} (#{tag.value} #{container_instance.ec2_instance_id})"
else
Hako.logger.info "Container instance is #{container_instance_arn} (#{container_instance.ec2_instance_id})"
@@ -453,30 +466,30 @@
service_name: @app_id,
task_definition: task_definition_arn,
desired_count: @desired_count,
role: @role,
}
- name = @elb.find_or_create_load_balancer(front_port)
+ name = ecs_elb_client.find_or_create_load_balancer(front_port)
if name
params[:load_balancers] = [
{
load_balancer_name: name,
container_name: 'front',
container_port: 80,
},
]
end
- @ecs.create_service(params).service
+ ecs_client.create_service(params).service
else
params = {
cluster: @cluster,
service: @app_id,
desired_count: @desired_count,
task_definition: task_definition_arn,
}
if service_changed?(service, params)
- @ecs.update_service(params).service
+ ecs_client.update_service(params).service
else
:noop
end
end
end
@@ -499,20 +512,22 @@
# @return [nil]
def wait_for_ready(service)
latest_event_id = find_latest_event_id(service.events)
Hako.logger.debug " latest_event_id=#{latest_event_id}"
loop do
- s = @ecs.describe_services(cluster: service.cluster_arn, services: [service.service_arn]).services[0]
+ s = ecs_client.describe_services(cluster: service.cluster_arn, services: [service.service_arn]).services[0]
s.events.each do |e|
if e.id == latest_event_id
break
end
Hako.logger.info "#{e.created_at}: #{e.message}"
end
latest_event_id = find_latest_event_id(s.events)
- finished = s.deployments.all? { |d| d.status != 'ACTIVE' }
- Hako.logger.debug " latest_event_id=#{latest_event_id}, deployment statuses=#{s.deployments.map(&:status)}"
- if finished
+ Hako.logger.debug " latest_event_id=#{latest_event_id}, deployments=#{s.deployments}"
+ no_active = s.deployments.all? { |d| d.status != 'ACTIVE' }
+ primary = s.deployments.find { |d| d.status == 'PRIMARY' }
+ primary_ready = primary && primary.running_count == primary.desired_count
+ if no_active && primary_ready
return
else
sleep 1
end
end