lib/elasticity/job_flow.rb in elasticity-5.0.3 vs lib/elasticity/job_flow.rb in elasticity-6.0

- old
+ new

@@ -23,23 +23,18 @@ attr_accessor :visible_to_all_users attr_accessor :enable_debugging attr_accessor :job_flow_role attr_accessor :service_role - attr_reader :access_key - attr_reader :secret_key - - def initialize(access=nil, secret=nil) + def initialize @action_on_failure = 'TERMINATE_JOB_FLOW' @name = 'Elasticity Job Flow' @ami_version = 'latest' @keep_job_flow_alive_when_no_steps = false self.placement = 'us-east-1a' @enable_debugging = false - @access_key = access - @secret_key = secret @visible_to_all_users = false @bootstrap_actions = [] @jobflow_steps = [] @installed_steps = [] @@ -48,20 +43,17 @@ set_master_instance_group(Elasticity::InstanceGroup.new) set_core_instance_group(Elasticity::InstanceGroup.new) @instance_count = 2 @master_instance_type = 'm1.small' @slave_instance_type = 'm1.small' - - @access_key = access - @secret_key = secret end - def self.from_jobflow_id(access, secret, jobflow_id, region = 'us-east-1') - JobFlow.new(access, secret).tap do |j| + def self.from_jobflow_id(jobflow_id, region = 'us-east-1') + JobFlow.new.tap do |j| j.instance_variable_set(:@region, region) j.instance_variable_set(:@jobflow_id, jobflow_id) - j.instance_variable_set(:@installed_steps, j.status.installed_steps) + j.instance_variable_set(:@installed_steps, ClusterStepStatus.installed_steps(j.cluster_step_status)) end end def placement=(new_placement) @placement = new_placement @@ -152,30 +144,37 @@ raise JobFlowNotStartedError, 'Cannot #shutdown a job flow that has not yet been #run.' end emr.terminate_jobflows(@jobflow_id) end - def status + def cluster_status if !is_jobflow_running? raise JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.' end - emr.describe_jobflow(@jobflow_id) + ClusterStatus.from_aws_data(emr.describe_cluster(@jobflow_id)) end + def cluster_step_status + if !is_jobflow_running? + raise JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.' + end + ClusterStepStatus.from_aws_list_data(emr.list_steps(@jobflow_id)) + end + def wait_for_completion(&on_wait) l = Elasticity::Looper.new(method(:retry_check), on_wait) l.go end private def retry_check - jf_status = status - return status.active?, jf_status + jf_status = cluster_status + return cluster_status.active?, jf_status end def emr - @emr ||= Elasticity::EMR.new(@access_key, @secret_key, :region => @region) + @emr ||= Elasticity::EMR.new(:region => @region) end def is_jobflow_running? !@jobflow_id.nil? end