lib/elasticity/job_flow.rb in elasticity-5.0.3 vs lib/elasticity/job_flow.rb in elasticity-6.0
- old
+ new
@@ -23,23 +23,18 @@
attr_accessor :visible_to_all_users
attr_accessor :enable_debugging
attr_accessor :job_flow_role
attr_accessor :service_role
- attr_reader :access_key
- attr_reader :secret_key
-
- def initialize(access=nil, secret=nil)
+ def initialize
@action_on_failure = 'TERMINATE_JOB_FLOW'
@name = 'Elasticity Job Flow'
@ami_version = 'latest'
@keep_job_flow_alive_when_no_steps = false
self.placement = 'us-east-1a'
@enable_debugging = false
- @access_key = access
- @secret_key = secret
@visible_to_all_users = false
@bootstrap_actions = []
@jobflow_steps = []
@installed_steps = []
@@ -48,20 +43,17 @@
set_master_instance_group(Elasticity::InstanceGroup.new)
set_core_instance_group(Elasticity::InstanceGroup.new)
@instance_count = 2
@master_instance_type = 'm1.small'
@slave_instance_type = 'm1.small'
-
- @access_key = access
- @secret_key = secret
end
- def self.from_jobflow_id(access, secret, jobflow_id, region = 'us-east-1')
- JobFlow.new(access, secret).tap do |j|
+ def self.from_jobflow_id(jobflow_id, region = 'us-east-1')
+ JobFlow.new.tap do |j|
j.instance_variable_set(:@region, region)
j.instance_variable_set(:@jobflow_id, jobflow_id)
- j.instance_variable_set(:@installed_steps, j.status.installed_steps)
+ j.instance_variable_set(:@installed_steps, ClusterStepStatus.installed_steps(j.cluster_step_status))
end
end
def placement=(new_placement)
@placement = new_placement
@@ -152,30 +144,37 @@
raise JobFlowNotStartedError, 'Cannot #shutdown a job flow that has not yet been #run.'
end
emr.terminate_jobflows(@jobflow_id)
end
- def status
+ def cluster_status
if !is_jobflow_running?
raise JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.'
end
- emr.describe_jobflow(@jobflow_id)
+ ClusterStatus.from_aws_data(emr.describe_cluster(@jobflow_id))
end
+ def cluster_step_status
+ if !is_jobflow_running?
+ raise JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.'
+ end
+ ClusterStepStatus.from_aws_list_data(emr.list_steps(@jobflow_id))
+ end
+
def wait_for_completion(&on_wait)
l = Elasticity::Looper.new(method(:retry_check), on_wait)
l.go
end
private
def retry_check
- jf_status = status
- return status.active?, jf_status
+ jf_status = cluster_status
+ return cluster_status.active?, jf_status
end
def emr
- @emr ||= Elasticity::EMR.new(@access_key, @secret_key, :region => @region)
+ @emr ||= Elasticity::EMR.new(:region => @region)
end
def is_jobflow_running?
!@jobflow_id.nil?
end