lib/elasticity/job_flow.rb in elasticity-2.2 vs lib/elasticity/job_flow.rb in elasticity-2.3

- old
+ new

@@ -15,43 +15,49 @@ attr_accessor :master_instance_type attr_accessor :slave_instance_type attr_accessor :ami_version attr_accessor :keep_job_flow_alive_when_no_steps attr_accessor :ec2_subnet_id + attr_accessor :placement def initialize(access, secret) @action_on_failure = 'TERMINATE_JOB_FLOW' @ec2_key_name = 'default' @hadoop_version = '0.20.205' @name = 'Elasticity Job Flow' @ami_version = 'latest' @keep_job_flow_alive_when_no_steps = false + @placement = 'us-east-1a' + @access = access + @secret = secret + @bootstrap_actions = [] @jobflow_steps = [] @installed_steps = [] @instance_groups = {} set_master_instance_group(Elasticity::InstanceGroup.new) set_core_instance_group(Elasticity::InstanceGroup.new) - @instance_count = 2 @master_instance_type = 'm1.small' @slave_instance_type = 'm1.small' - @emr = Elasticity::EMR.new(access, secret) + @access = access + @secret = secret end - def self.from_jobflow_id(access, secret, jobflow_id) + def self.from_jobflow_id(access, secret, jobflow_id, region = 'us-east-1') JobFlow.new(access, secret).tap do |j| + j.instance_variable_set(:@region, region) j.instance_variable_set(:@jobflow_id, jobflow_id) j.instance_variable_set(:@installed_steps, j.status.installed_steps) end end def instance_count=(count) - raise ArgumentError, 'Instance count cannot be set to less than 2 (requested 1)' unless count > 1 + raise ArgumentError, "Instance count cannot be set to less than 2 (requested #{count})" unless count > 1 @instance_groups[:core].count = count - 1 @instance_count = count end def master_instance_type=(type) @@ -89,32 +95,37 @@ jobflow_steps = [] if jobflow_step.requires_installation? && !@installed_steps.include?(jobflow_step.class) jobflow_steps << jobflow_step.aws_installation_step end jobflow_steps << jobflow_step.to_aws_step(self) - @emr.add_jobflow_steps(@jobflow_id, {:steps => jobflow_steps}) + emr.add_jobflow_steps(@jobflow_id, {:steps => jobflow_steps}) else @jobflow_steps << jobflow_step end end def run raise_if @jobflow_steps.empty?, JobFlowMissingStepsError, 'Cannot run a job flow without adding steps. Please use #add_step.' raise_if is_jobflow_running?, JobFlowRunningError, 'Cannot run a job flow multiple times. To do more with this job flow, please use #add_step.' - @jobflow_id = @emr.run_job_flow(jobflow_config) + @jobflow_id = emr.run_job_flow(jobflow_config) end def shutdown raise_unless is_jobflow_running?, JobFlowNotStartedError, 'Cannot #shutdown a job flow that has not yet been #run.' - @emr.terminate_jobflows(@jobflow_id) + emr.terminate_jobflows(@jobflow_id) end def status raise_unless is_jobflow_running?, JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.' - @emr.describe_jobflow(@jobflow_id) + emr.describe_jobflow(@jobflow_id) end private + + def emr + @region ||= @placement.match(/(\w+-\w+-\d+)/)[0] + @emr ||= Elasticity::EMR.new(@access, @secret, :region => @region) + end def is_jobflow_running? !@jobflow_id.nil? end \ No newline at end of file