app/models/logical/naf/job_creator.rb in naf-1.1.4 vs app/models/logical/naf/job_creator.rb in naf-2.0.0

- old
+ new

@@ -12,22 +12,13 @@ # Before adding a job to the queue, check whether the number of # jobs (running/queued) is equal to or greater than the application # run group limit, or if enqueue_backlogs is set to false. If so, # do not add the job to the queue - running_jobs = ::Naf::RunningJob. - select('application_run_group_limit, MAX(created_at) AS created_at, count(*)'). - where('command = ? AND application_run_group_name = ?', - application.command, application_run_group_name). - group('application_run_group_name, application_run_group_limit').first + running_jobs = retrieve_jobs(::Naf::RunningJob, application.command, application_run_group_name) + queued_jobs = retrieve_jobs(::Naf::QueuedJob, application.command, application_run_group_name) - queued_jobs = ::Naf::QueuedJob. - select('application_run_group_limit, MAX(created_at) AS created_at, count(*)'). - where('command = ? AND application_run_group_name = ?', - application.command, application_run_group_name). - group('application_run_group_name, application_run_group_limit').first - if enqueue == false && (running_jobs.present? || queued_jobs.present?) group_limit = running_jobs.try(:application_run_group_limit).to_i + queued_jobs.try(:application_run_group_limit).to_i total_jobs = running_jobs.try(:count).to_i + queued_jobs.try(:count).to_i return if group_limit <= total_jobs @@ -40,11 +31,10 @@ application_run_group_restriction_id: application_run_group_restriction.id, application_run_group_name: application_run_group_name, application_run_group_limit: application_run_group_limit, priority: priority, log_level: application.log_level) - historical_job.add_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:pre_work]]) # Create historical job affinity tabs for each affinity associated with the historical job affinities.each do |affinity| affinity_parameter = ::Naf::ApplicationScheduleAffinityTab. where(affinity_id: affinity.id, @@ -53,23 +43,24 @@ ::Naf::HistoricalJobAffinityTab.create(historical_job_id: historical_job.id, affinity_id: affinity.id, affinity_parameter: affinity_parameter) end - historical_job.verify_prerequisites(prerequisites) - # Create historical job prerequisites for each prerequisite associated with the historical job - prerequisites.each do |prerequisite| - ::Naf::HistoricalJobPrerequisite.create(historical_job_id: historical_job.id, - prerequisite_historical_job_id: prerequisite.id) - end + verify_and_create_prerequisites(historical_job, prerequisites) create_queue_job(historical_job) return historical_job end end + def retrieve_jobs(klass, command, application_run_group_name) + klass.select('application_run_group_limit, MAX(created_at) AS created_at, count(*)'). + where('command = ? AND application_run_group_name = ?', command, application_run_group_name). + group('application_run_group_name, application_run_group_limit').first + end + def queue_application_schedule(application_schedule, schedules_queued_already = []) prerequisite_jobs = [] # Check if schedule has been queued if schedules_queued_already.include? application_schedule.id @@ -112,18 +103,23 @@ priority: priority) affinities.each do |affinity| ::Naf::HistoricalJobAffinityTab.create(historical_job_id: historical_job.id, affinity_id: affinity.id) end - historical_job.verify_prerequisites(prerequisites) - prerequisites.each do |prerequisite| - ::Naf::HistoricalJobPrerequisite.create(historical_job_id: historical_job.id, - prerequisite_historical_job_id: prerequisite.id) - end + verify_and_create_prerequisites(historical_job, prerequisites) create_queue_job(historical_job) return historical_job + end + end + + def verify_and_create_prerequisites(job, prerequisites) + job.verify_prerequisites(prerequisites) + # Create historical job prerequisites for each prerequisite associated with the historical job + prerequisites.each do |prerequisite| + ::Naf::HistoricalJobPrerequisite.create(historical_job_id: job.id, + prerequisite_historical_job_id: prerequisite.id) end end def create_queue_job(historical_job) queued_job = ::Naf::QueuedJob.new(application_id: historical_job.application_id,