lib/elasticity/job_flow.rb in elasticity-2.5.6 vs lib/elasticity/job_flow.rb in elasticity-2.6
- old
+ new
@@ -1,10 +1,11 @@
module Elasticity
class JobFlowRunningError < StandardError; end
class JobFlowNotStartedError < StandardError; end
class JobFlowMissingStepsError < StandardError; end
+ class LogUriMissingError < StandardError; end
class JobFlow
attr_accessor :action_on_failure
attr_accessor :ec2_key_name
@@ -17,10 +18,11 @@
attr_accessor :ami_version
attr_accessor :keep_job_flow_alive_when_no_steps
attr_accessor :ec2_subnet_id
attr_accessor :placement
attr_accessor :visible_to_all_users
+ attr_accessor :enable_debugging
attr_reader :access_key
attr_reader :secret_key
def initialize(access=nil, secret=nil)
@@ -28,10 +30,11 @@
@hadoop_version = '1.0.3'
@name = 'Elasticity Job Flow'
@ami_version = 'latest'
@keep_job_flow_alive_when_no_steps = false
@placement = 'us-east-1a'
+ @enable_debugging = false
@access_key = access
@secret_key = secret
@visible_to_all_users = false
@@ -56,10 +59,17 @@
j.instance_variable_set(:@jobflow_id, jobflow_id)
j.instance_variable_set(:@installed_steps, j.status.installed_steps)
end
end
+ def enable_debugging=(enabled)
+ if enabled
+ raise LogUriMissingError, 'To enable debugging, please set a #log_uri' unless @log_uri
+ end
+ @enable_debugging = enabled
+ end
+
def instance_count=(count)
raise ArgumentError, "Instance count cannot be set to less than 2 (requested #{count})" unless count > 1
@instance_groups[:core].count = count - 1
@instance_count = count
end
@@ -128,12 +138,22 @@
raise JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.'
end
emr.describe_jobflow(@jobflow_id)
end
+ def wait_for_completion(&on_wait)
+ l = Elasticity::Looper.new(method(:retry_check), on_wait)
+ l.go
+ end
+
private
+ def retry_check
+ jf_status = status
+ return status.state == 'RUNNING' || status.state == 'STARTING', jf_status
+ end
+
def emr
@region ||= @placement.match(/(\w+-\w+-\d+)/)[0]
@emr ||= Elasticity::EMR.new(@access_key, @secret_key, :region => @region)
end
@@ -141,10 +161,12 @@
!@jobflow_id.nil?
end
def jobflow_config
config = jobflow_preamble
- config[:steps] = jobflow_steps
+ steps = jobflow_steps
+ steps.insert(0, Elasticity::SetupHadoopDebuggingStep.new.to_aws_step(self)) if @enable_debugging
+ config[:steps] = steps
config[:log_uri] = @log_uri if @log_uri
config[:bootstrap_actions] = @bootstrap_actions.map{|a| a.to_aws_bootstrap_action} unless @bootstrap_actions.empty?
config
end
\ No newline at end of file