lib/elasticity/job_flow.rb in elasticity-2.2 vs lib/elasticity/job_flow.rb in elasticity-2.3
- old
+ new
@@ -15,43 +15,49 @@
attr_accessor :master_instance_type
attr_accessor :slave_instance_type
attr_accessor :ami_version
attr_accessor :keep_job_flow_alive_when_no_steps
attr_accessor :ec2_subnet_id
+ attr_accessor :placement
def initialize(access, secret)
@action_on_failure = 'TERMINATE_JOB_FLOW'
@ec2_key_name = 'default'
@hadoop_version = '0.20.205'
@name = 'Elasticity Job Flow'
@ami_version = 'latest'
@keep_job_flow_alive_when_no_steps = false
+ @placement = 'us-east-1a'
+ @access = access
+ @secret = secret
+
@bootstrap_actions = []
@jobflow_steps = []
@installed_steps = []
@instance_groups = {}
set_master_instance_group(Elasticity::InstanceGroup.new)
set_core_instance_group(Elasticity::InstanceGroup.new)
-
@instance_count = 2
@master_instance_type = 'm1.small'
@slave_instance_type = 'm1.small'
- @emr = Elasticity::EMR.new(access, secret)
+ @access = access
+ @secret = secret
end
- def self.from_jobflow_id(access, secret, jobflow_id)
+ def self.from_jobflow_id(access, secret, jobflow_id, region = 'us-east-1')
JobFlow.new(access, secret).tap do |j|
+ j.instance_variable_set(:@region, region)
j.instance_variable_set(:@jobflow_id, jobflow_id)
j.instance_variable_set(:@installed_steps, j.status.installed_steps)
end
end
def instance_count=(count)
- raise ArgumentError, 'Instance count cannot be set to less than 2 (requested 1)' unless count > 1
+ raise ArgumentError, "Instance count cannot be set to less than 2 (requested #{count})" unless count > 1
@instance_groups[:core].count = count - 1
@instance_count = count
end
def master_instance_type=(type)
@@ -89,32 +95,37 @@
jobflow_steps = []
if jobflow_step.requires_installation? && !@installed_steps.include?(jobflow_step.class)
jobflow_steps << jobflow_step.aws_installation_step
end
jobflow_steps << jobflow_step.to_aws_step(self)
- @emr.add_jobflow_steps(@jobflow_id, {:steps => jobflow_steps})
+ emr.add_jobflow_steps(@jobflow_id, {:steps => jobflow_steps})
else
@jobflow_steps << jobflow_step
end
end
def run
raise_if @jobflow_steps.empty?, JobFlowMissingStepsError, 'Cannot run a job flow without adding steps. Please use #add_step.'
raise_if is_jobflow_running?, JobFlowRunningError, 'Cannot run a job flow multiple times. To do more with this job flow, please use #add_step.'
- @jobflow_id = @emr.run_job_flow(jobflow_config)
+ @jobflow_id = emr.run_job_flow(jobflow_config)
end
def shutdown
raise_unless is_jobflow_running?, JobFlowNotStartedError, 'Cannot #shutdown a job flow that has not yet been #run.'
- @emr.terminate_jobflows(@jobflow_id)
+ emr.terminate_jobflows(@jobflow_id)
end
def status
raise_unless is_jobflow_running?, JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.'
- @emr.describe_jobflow(@jobflow_id)
+ emr.describe_jobflow(@jobflow_id)
end
private
+
+ def emr
+ @region ||= @placement.match(/(\w+-\w+-\d+)/)[0]
+ @emr ||= Elasticity::EMR.new(@access, @secret, :region => @region)
+ end
def is_jobflow_running?
!@jobflow_id.nil?
end
\ No newline at end of file