app/models/logical/naf/job_creator.rb in naf-1.1.4 vs app/models/logical/naf/job_creator.rb in naf-2.0.0
- old
+ new
@@ -12,22 +12,13 @@
# Before adding a job to the queue, check whether the number of
# jobs (running/queued) is equal to or greater than the application
# run group limit, or if enqueue_backlogs is set to false. If so,
# do not add the job to the queue
- running_jobs = ::Naf::RunningJob.
- select('application_run_group_limit, MAX(created_at) AS created_at, count(*)').
- where('command = ? AND application_run_group_name = ?',
- application.command, application_run_group_name).
- group('application_run_group_name, application_run_group_limit').first
+ running_jobs = retrieve_jobs(::Naf::RunningJob, application.command, application_run_group_name)
+ queued_jobs = retrieve_jobs(::Naf::QueuedJob, application.command, application_run_group_name)
- queued_jobs = ::Naf::QueuedJob.
- select('application_run_group_limit, MAX(created_at) AS created_at, count(*)').
- where('command = ? AND application_run_group_name = ?',
- application.command, application_run_group_name).
- group('application_run_group_name, application_run_group_limit').first
-
if enqueue == false && (running_jobs.present? || queued_jobs.present?)
group_limit = running_jobs.try(:application_run_group_limit).to_i + queued_jobs.try(:application_run_group_limit).to_i
total_jobs = running_jobs.try(:count).to_i + queued_jobs.try(:count).to_i
return if group_limit <= total_jobs
@@ -40,11 +31,10 @@
application_run_group_restriction_id: application_run_group_restriction.id,
application_run_group_name: application_run_group_name,
application_run_group_limit: application_run_group_limit,
priority: priority,
log_level: application.log_level)
- historical_job.add_tags([::Naf::HistoricalJob::SYSTEM_TAGS[:pre_work]])
# Create historical job affinity tabs for each affinity associated with the historical job
affinities.each do |affinity|
affinity_parameter = ::Naf::ApplicationScheduleAffinityTab.
where(affinity_id: affinity.id,
@@ -53,23 +43,24 @@
::Naf::HistoricalJobAffinityTab.create(historical_job_id: historical_job.id,
affinity_id: affinity.id,
affinity_parameter: affinity_parameter)
end
- historical_job.verify_prerequisites(prerequisites)
- # Create historical job prerequisites for each prerequisite associated with the historical job
- prerequisites.each do |prerequisite|
- ::Naf::HistoricalJobPrerequisite.create(historical_job_id: historical_job.id,
- prerequisite_historical_job_id: prerequisite.id)
- end
+ verify_and_create_prerequisites(historical_job, prerequisites)
create_queue_job(historical_job)
return historical_job
end
end
+ def retrieve_jobs(klass, command, application_run_group_name)
+ klass.select('application_run_group_limit, MAX(created_at) AS created_at, count(*)').
+ where('command = ? AND application_run_group_name = ?', command, application_run_group_name).
+ group('application_run_group_name, application_run_group_limit').first
+ end
+
def queue_application_schedule(application_schedule, schedules_queued_already = [])
prerequisite_jobs = []
# Check if schedule has been queued
if schedules_queued_already.include? application_schedule.id
@@ -112,18 +103,23 @@
priority: priority)
affinities.each do |affinity|
::Naf::HistoricalJobAffinityTab.create(historical_job_id: historical_job.id, affinity_id: affinity.id)
end
- historical_job.verify_prerequisites(prerequisites)
- prerequisites.each do |prerequisite|
- ::Naf::HistoricalJobPrerequisite.create(historical_job_id: historical_job.id,
- prerequisite_historical_job_id: prerequisite.id)
- end
+ verify_and_create_prerequisites(historical_job, prerequisites)
create_queue_job(historical_job)
return historical_job
+ end
+ end
+
+ def verify_and_create_prerequisites(job, prerequisites)
+ job.verify_prerequisites(prerequisites)
+ # Create historical job prerequisites for each prerequisite associated with the historical job
+ prerequisites.each do |prerequisite|
+ ::Naf::HistoricalJobPrerequisite.create(historical_job_id: job.id,
+ prerequisite_historical_job_id: prerequisite.id)
end
end
def create_queue_job(historical_job)
queued_job = ::Naf::QueuedJob.new(application_id: historical_job.application_id,