lib/elasticity/job_flow.rb in elasticity-2.0 vs lib/elasticity/job_flow.rb in elasticity-2.1
- old
+ new
@@ -20,34 +20,65 @@
def initialize(access, secret)
@action_on_failure = 'TERMINATE_JOB_FLOW'
@ec2_key_name = 'default'
@hadoop_version = '0.20.205'
- @instance_count = 2
- @master_instance_type = 'm1.small'
@name = 'Elasticity Job Flow'
- @slave_instance_type = 'm1.small'
@ami_version = 'latest'
@keep_job_flow_alive_when_no_steps = false
- @emr = Elasticity::EMR.new(access, secret)
-
@bootstrap_actions = []
@jobflow_steps = []
@installed_steps = []
+
+ @instance_groups = {}
+ set_master_instance_group(Elasticity::InstanceGroup.new)
+ set_core_instance_group(Elasticity::InstanceGroup.new)
+
+ @instance_count = 2
+ @master_instance_type = 'm1.small'
+ @slave_instance_type = 'm1.small'
+
+ @emr = Elasticity::EMR.new(access, secret)
end
def instance_count=(count)
raise ArgumentError, 'Instance count cannot be set to less than 2 (requested 1)' unless count > 1
+ @instance_groups[:core].count = count - 1
@instance_count = count
end
+ def master_instance_type=(type)
+ @instance_groups[:master].type = type
+ @master_instance_type = type
+ end
+
+ def slave_instance_type=(type)
+ @instance_groups[:core].type = type
+ @slave_instance_type = type
+ end
+
def add_bootstrap_action(bootstrap_action)
raise_if is_jobflow_running?, JobFlowRunningError, 'To modify bootstrap actions, please create a new job flow.'
@bootstrap_actions << bootstrap_action
end
+ def set_master_instance_group(instance_group)
+ instance_group.role = 'MASTER'
+ @instance_groups[:master] = instance_group
+ end
+
+ def set_core_instance_group(instance_group)
+ instance_group.role = 'CORE'
+ @instance_groups[:core] = instance_group
+ end
+
+ def set_task_instance_group(instance_group)
+ instance_group.role = 'TASK'
+ @instance_groups[:task] = instance_group
+ end
+
def add_step(jobflow_step)
if is_jobflow_running?
jobflow_steps = []
if jobflow_step.class.send(:requires_installation?) && !@installed_steps.include?(jobflow_step.class)
jobflow_steps << jobflow_step.class.send(:aws_installation_step)
@@ -88,24 +119,22 @@
config[:bootstrap_actions] = @bootstrap_actions.map{|a| a.to_aws_bootstrap_action} unless @bootstrap_actions.empty?
config
end
def jobflow_preamble
- {
+ preamble = {
:name => @name,
:ami_version => @ami_version,
:instances => {
:keep_job_flow_alive_when_no_steps => @keep_job_flow_alive_when_no_steps,
:ec2_key_name => @ec2_key_name,
:hadoop_version => @hadoop_version,
- :instance_count => @instance_count,
- :master_instance_type => @master_instance_type,
- :slave_instance_type => @slave_instance_type,
+ :instance_groups => jobflow_instance_groups
}
- }.tap do |preamble|
- preamble.merge!(:ec2_subnet_id => @ec2_subnet_id) if @ec2_subnet_id
- end
+ }
+ preamble.merge!(:ec2_subnet_id => @ec2_subnet_id) if @ec2_subnet_id
+ preamble
end
def jobflow_steps
steps = []
@jobflow_steps.each do |step|
@@ -114,9 +143,14 @@
@installed_steps << step.class
end
steps << step.to_aws_step(self)
end
steps
+ end
+
+ def jobflow_instance_groups
+ groups = [:master, :core, :task].map{|role| @instance_groups[role]}.compact
+ groups.map(&:to_aws_instance_config)
end
end
end
\ No newline at end of file