lib/hako/schedulers/ecs.rb in hako-0.13.3 vs lib/hako/schedulers/ecs.rb in hako-0.14.0
- old
+ new
@@ -11,23 +11,29 @@
DEFAULT_CLUSTER = 'default'
DEFAULT_FRONT_PORT = 10000
attr_reader :task
+ # @param [Hash<String, Object>] options
def configure(options)
@cluster = options.fetch('cluster', DEFAULT_CLUSTER)
- @desired_count = options.fetch('desired_count') { validation_error!('desired_count must be set') }
+ @desired_count = options.fetch('desired_count', nil)
region = options.fetch('region') { validation_error!('region must be set') }
@role = options.fetch('role', nil)
@ecs = Aws::ECS::Client.new(region: region)
@elb = EcsElb.new(@app_id, Aws::ElasticLoadBalancing::Client.new(region: region), options.fetch('elb', nil))
@ec2 = Aws::EC2::Client.new(region: region)
@started_at = nil
@container_instance_arn = nil
end
+ # @param [Hash<String, Container>] containers
+ # @return [nil]
def deploy(containers)
+ unless @desired_count
+ validation_error!('desired_count must be set')
+ end
front_port = determine_front_port
@scripts.each { |script| script.deploy_started(containers, front_port) }
definitions = create_definitions(containers)
if @dry_run
@@ -51,10 +57,14 @@
end
Hako.logger.info 'Deployment completed'
end
end
+ # @param [Hash<String, Container>] containers
+ # @param [Array<String>] commands
+ # @param [Hash<String, String>] env
+ # @return [nil]
def oneshot(containers, commands, env)
definitions = create_definitions(containers)
definitions.each do |definition|
definition.delete(:essential)
end
@@ -81,18 +91,20 @@
@scripts.each { |script| script.oneshot_started(self) }
wait_for_oneshot_finish
end
end
+ # @return [nil]
def stop_oneshot
if @task
Hako.logger.warn "Stopping #{@task.task_arn}"
@ecs.stop_task(cluster: @cluster, task: @task.task_arn, reason: 'Stopped by hako stop_oneshot')
wait_for_oneshot_finish
end
end
+ # @return [nil]
def status
service = describe_service
unless service
puts 'Unavailable'
exit 1
@@ -147,10 +159,11 @@
service.events.first(10).each do |e|
puts " #{e.created_at}: #{e.message}"
end
end
+ # @return [nil]
def remove
service = describe_service
if service
@ecs.delete_service(cluster: @cluster, service: @app_id)
Hako.logger.info "#{service.service_arn} is deleted"
@@ -161,17 +174,19 @@
@elb.destroy
end
private
+ # @return [Aws::ECS::Types::Service, nil]
def describe_service
service = @ecs.describe_services(cluster: @cluster, services: [@app_id]).services[0]
if service && service.status != 'INACTIVE'
service
end
end
+ # @return [Fixnum]
def determine_front_port
if @dry_run
return DEFAULT_FRONT_PORT
end
service = describe_service
@@ -180,10 +195,11 @@
else
new_front_port
end
end
+ # @return [Fixnum]
def new_front_port
max_port = -1
@ecs.list_services(cluster: @cluster).each do |page|
unless page.service_arns.empty?
@ecs.describe_services(cluster: @cluster, services: page.service_arns).services.each do |s|
@@ -201,10 +217,12 @@
else
max_port + 1
end
end
+ # @param [Aws::ECS::Types::Service] service
+ # @return [Fixnum, nil]
def find_front_port(service)
task_definition = @ecs.describe_task_definition(task_definition: service.task_definition).task_definition
container_definitions = {}
task_definition.container_definitions.each do |c|
container_definitions[c.name] = c
@@ -212,10 +230,13 @@
if container_definitions['front']
container_definitions['front'].port_mappings[0].host_port
end
end
+ # @param [String] family
+ # @param [Array<Hash>] definitions
+ # @return [Boolean]
def task_definition_changed?(family, definitions)
if @force
return true
end
task_definition = @ecs.describe_task_definition(task_definition: family).task_definition
@@ -234,10 +255,12 @@
rescue Aws::ECS::Errors::ClientException
# Task definition does not exist
true
end
+ # @param [Hash<String, Hash<String, String>>] actual_volumes
+ # @return [Boolean]
def different_volumes?(actual_volumes)
if @volumes.size != actual_volumes.size
return true
end
actual_volumes.each do |actual_volume|
@@ -251,14 +274,19 @@
end
false
end
+ # @param [Hash] expected_container
+ # @param [Aws::ECS::Types::ContainerDefinition] actual_container
+ # @return [Boolean]
def different_definition?(expected_container, actual_container)
EcsDefinitionComparator.new(expected_container).different?(actual_container)
end
+ # @param [Array<Hash>] definitions
+ # @return [Aws::ECS::Types::TaskDefinition, Symbol]
def register_task_definition(definitions)
if task_definition_changed?(@app_id, definitions)
@ecs.register_task_definition(
family: @app_id,
container_definitions: definitions,
@@ -267,16 +295,20 @@
else
:noop
end
end
+ # @param [Hash<String, Container>] containers
+ # @return [nil]
def create_definitions(containers)
containers.map do |name, container|
create_definition(name, container)
end
end
+ # @param [Array<Hash>] definitions
+ # @return [Aws::ECS::Types::TaskDefinition, Symbol]
def register_task_definition_for_oneshot(definitions)
family = "#{@app_id}-oneshot"
if task_definition_changed?(family, definitions)
@ecs.register_task_definition(
family: "#{@app_id}-oneshot",
@@ -286,19 +318,23 @@
else
:noop
end
end
+ # @return [Hash]
def volumes_definition
@volumes.map do |name, volume|
{
name: name,
host: { source_path: volume['source_path'] },
}
end
end
+ # @param [String] name
+ # @param [Container] container
+ # @return [Hash]
def create_definition(name, container)
environment = container.env.map { |k, v| { name: k, value: v } }
{
name: name,
image: container.image_tag,
@@ -311,10 +347,14 @@
docker_labels: container.docker_labels,
mount_points: container.mount_points,
}
end
+ # @param [Aws::ECS::Types::TaskDefinition] task_definition
+ # @param [Array<String>] commands
+ # @param [Hash<String, String>] env
+ # @return [Aws::ECS::Types::Task]
def run_task(task_definition, commands, env)
environment = env.map { |k, v| { name: k, value: v } }
@ecs.run_task(
cluster: @cluster,
task_definition: task_definition.task_definition_arn,
@@ -330,10 +370,11 @@
count: 1,
started_by: 'hako oneshot',
).tasks[0]
end
+ # @return [Fixnum]
def wait_for_oneshot_finish
containers = wait_for_task(@task)
@task = nil
Hako.logger.info 'Oneshot task finished'
exit_code = 127
@@ -348,10 +389,12 @@
end
end
exit_code
end
+ # @param [Aws::ECS::Types::Task] task
+ # @return [nil]
def wait_for_task(task)
task_arn = task.task_arn
loop do
task = @ecs.describe_tasks(cluster: @cluster, tasks: [task_arn]).tasks[0]
if task.nil?
@@ -383,10 +426,12 @@
end
sleep 1
end
end
+ # @param [String] container_instance_arn
+ # @return [nil]
def report_container_instance(container_instance_arn)
container_instance = @ecs.describe_container_instances(cluster: @cluster, container_instances: [container_instance_arn]).container_instances[0]
@ec2.describe_tags(filters: [{ name: 'resource-id', values: [container_instance.ec2_instance_id] }]).each do |page|
tag = page.tags.find { |t| t.key == 'Name' }
if tag
@@ -395,10 +440,13 @@
Hako.logger.info "Container instance is #{container_instance_arn} (#{container_instance.ec2_instance_id})"
end
end
end
+ # @param [String] task_definition_arn
+ # @param [Fixnum] front_port
+ # @return [Aws::ECS::Types::Service, Symbol]
def create_or_update_service(task_definition_arn, front_port)
service = describe_service
if service.nil?
params = {
cluster: @cluster,
@@ -433,38 +481,47 @@
end
end
SERVICE_KEYS = %i[desired_count task_definition].freeze
+ # @param [Aws::ECS::Types::Service] service
+ # @param [Hash] params
+ # @return [Boolean]
def service_changed?(service, params)
SERVICE_KEYS.each do |key|
if service.public_send(key) != params[key]
return true
end
end
false
end
+ # @param [Aws::ECS::Types::Service] service
+ # @return [nil]
def wait_for_ready(service)
latest_event_id = find_latest_event_id(service.events)
+ Hako.logger.debug " latest_event_id=#{latest_event_id}"
loop do
s = @ecs.describe_services(cluster: service.cluster_arn, services: [service.service_arn]).services[0]
s.events.each do |e|
if e.id == latest_event_id
break
end
Hako.logger.info "#{e.created_at}: #{e.message}"
end
latest_event_id = find_latest_event_id(s.events)
finished = s.deployments.all? { |d| d.status != 'ACTIVE' }
+ Hako.logger.debug " latest_event_id=#{latest_event_id}, deployment statuses=#{s.deployments.map(&:status)}"
if finished
return
else
sleep 1
end
end
end
+ # @param [Array<Aws::ECS::Types::ServiceEvent>] events
+ # @return [String, nil]
def find_latest_event_id(events)
if events.empty?
nil
else
events[0].id