lib/elasticity/job_flow.rb in elasticity-1.5 vs lib/elasticity/job_flow.rb in elasticity-2.0
- old
+ new
@@ -1,52 +1,122 @@
module Elasticity
+ class JobFlowRunningError < StandardError; end
+ class JobFlowNotStartedError < StandardError; end
+ class JobFlowMissingStepsError < StandardError; end
+
class JobFlow
+ attr_accessor :action_on_failure
+ attr_accessor :ec2_key_name
attr_accessor :name
- attr_accessor :jobflow_id
- attr_accessor :state
- attr_accessor :steps
- attr_accessor :created_at
- attr_accessor :started_at
- attr_accessor :ready_at
+ attr_accessor :hadoop_version
attr_accessor :instance_count
+ attr_accessor :log_uri
attr_accessor :master_instance_type
attr_accessor :slave_instance_type
- attr_accessor :last_state_change_reason
+ attr_accessor :ami_version
+ attr_accessor :keep_job_flow_alive_when_no_steps
+ attr_accessor :ec2_subnet_id
- def initialize
- @steps = []
+ def initialize(access, secret)
+ @action_on_failure = 'TERMINATE_JOB_FLOW'
+ @ec2_key_name = 'default'
+ @hadoop_version = '0.20.205'
+ @instance_count = 2
+ @master_instance_type = 'm1.small'
+ @name = 'Elasticity Job Flow'
+ @slave_instance_type = 'm1.small'
+ @ami_version = 'latest'
+ @keep_job_flow_alive_when_no_steps = false
+
+ @emr = Elasticity::EMR.new(access, secret)
+
+ @bootstrap_actions = []
+ @jobflow_steps = []
+ @installed_steps = []
end
- # Create a jobflow from an AWS <member> (Nokogiri::XML::Element):
- # /DescribeJobFlowsResponse/DescribeJobFlowsResult/JobFlows/member
- def self.from_member_element(xml_element)
- jobflow = JobFlow.new
- jobflow.name = xml_element.xpath("./Name").text.strip
- jobflow.jobflow_id = xml_element.xpath("./JobFlowId").text.strip
- jobflow.state = xml_element.xpath("./ExecutionStatusDetail/State").text.strip
- jobflow.last_state_change_reason = xml_element.xpath("./ExecutionStatusDetail/LastStateChangeReason").text.strip
- jobflow.steps = JobFlowStep.from_members_nodeset(xml_element.xpath("./Steps/member"))
- jobflow.created_at = Time.parse(xml_element.xpath("./ExecutionStatusDetail/CreationDateTime").text.strip)
- started_at = xml_element.xpath("./ExecutionStatusDetail/StartDateTime").text.strip
- jobflow.started_at = (started_at == "") ? (nil) : (Time.parse(started_at))
- ready_at = xml_element.xpath("./ExecutionStatusDetail/ReadyDateTime").text.strip
- jobflow.ready_at = (ready_at == "") ? (nil) : (Time.parse(ready_at))
- jobflow.instance_count = xml_element.xpath("./Instances/InstanceCount").text.strip
- jobflow.master_instance_type = xml_element.xpath("./Instances/MasterInstanceType").text.strip
- jobflow.slave_instance_type = xml_element.xpath("./Instances/SlaveInstanceType").text.strip
- jobflow
+ def instance_count=(count)
+ raise ArgumentError, 'Instance count cannot be set to less than 2 (requested 1)' unless count > 1
+ @instance_count = count
end
- # Create JobFlows from a collection of AWS <member> nodes (Nokogiri::XML::NodeSet):
- # /DescribeJobFlowsResponse/DescribeJobFlowsResult/JobFlows
- def self.from_members_nodeset(members_nodeset)
- jobflows = []
- members_nodeset.each do |member|
- jobflows << from_member_element(member)
+ def add_bootstrap_action(bootstrap_action)
+ raise_if is_jobflow_running?, JobFlowRunningError, 'To modify bootstrap actions, please create a new job flow.'
+ @bootstrap_actions << bootstrap_action
+ end
+
+ def add_step(jobflow_step)
+ if is_jobflow_running?
+ jobflow_steps = []
+ if jobflow_step.class.send(:requires_installation?) && !@installed_steps.include?(jobflow_step.class)
+ jobflow_steps << jobflow_step.class.send(:aws_installation_step)
+ end
+ jobflow_steps << jobflow_step.to_aws_step(self)
+ @emr.add_jobflow_steps(@jobflow_id, {:steps => jobflow_steps})
+ else
+ @jobflow_steps << jobflow_step
end
- jobflows
+ end
+
+ def run
+ raise_if @jobflow_steps.empty?, JobFlowMissingStepsError, 'Cannot run a job flow without adding steps. Please use #add_step.'
+ raise_if is_jobflow_running?, JobFlowRunningError, 'Cannot run a job flow multiple times. To do more with this job flow, please use #add_step.'
+ @jobflow_id ||= @emr.run_job_flow(jobflow_config)
+ end
+
+ def shutdown
+ raise_unless is_jobflow_running?, JobFlowNotStartedError, 'Cannot #shutdown a job flow that has not yet been #run.'
+ @emr.terminate_jobflows(@jobflow_id)
+ end
+
+ def status
+ raise_unless is_jobflow_running?, JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.'
+ @emr.describe_jobflow(@jobflow_id)
+ end
+
+ private
+
+ def is_jobflow_running?
+ @jobflow_id
+ end
+
+ def jobflow_config
+ config = jobflow_preamble
+ config[:steps] = jobflow_steps
+ config[:log_uri] = @log_uri if @log_uri
+ config[:bootstrap_actions] = @bootstrap_actions.map{|a| a.to_aws_bootstrap_action} unless @bootstrap_actions.empty?
+ config
+ end
+
+ def jobflow_preamble
+ {
+ :name => @name,
+ :ami_version => @ami_version,
+ :instances => {
+ :keep_job_flow_alive_when_no_steps => @keep_job_flow_alive_when_no_steps,
+ :ec2_key_name => @ec2_key_name,
+ :hadoop_version => @hadoop_version,
+ :instance_count => @instance_count,
+ :master_instance_type => @master_instance_type,
+ :slave_instance_type => @slave_instance_type,
+ }
+ }.tap do |preamble|
+ preamble.merge!(:ec2_subnet_id => @ec2_subnet_id) if @ec2_subnet_id
+ end
+ end
+
+ def jobflow_steps
+ steps = []
+ @jobflow_steps.each do |step|
+ if step.class.send(:requires_installation?) && !@installed_steps.include?(step.class)
+ steps << step.class.send(:aws_installation_step)
+ @installed_steps << step.class
+ end
+ steps << step.to_aws_step(self)
+ end
+ steps
end
end
end
\ No newline at end of file