lib/right_chimp/daemon/ChimpDaemon.rb in right_chimp-2.1.24 vs lib/right_chimp/daemon/ChimpDaemon.rb in right_chimp-2.1.25

- old
+ new

@@ -87,13 +87,13 @@ when '--bind-address', '-b' @bind_address = arg.to_s when '--help', '-h' help when '--exit', '-x' - uri = "http://localhost:#{@port}/admin" - response = RestClient.post uri, { 'shutdown' => true }.to_yaml - exit 0 + uri = "http://localhost:#{@port}/admin" + response = RestClient.post uri, { 'shutdown' => true }.to_yaml + exit 0 end end rescue GetoptLong::InvalidOption => ex puts "Syntax: chimpd [--logfile=<name>] [--concurrency=<c>] [--delay=<d>] [--retry=<r>] [--port=<p>] [--bind-address=<addr> ] [--verbose]" exit 1 @@ -339,10 +339,11 @@ # # GET a group by name and status # /group/<name>/<status> # def do_GET(req, resp) + jobs = [] Log.debug 'get group info' group_name = req.request_uri.path.split('/')[-2] filter = req.request_uri.path.split('/')[-1] @@ -360,14 +361,15 @@ g = ChimpQueue[group_name.to_sym] raise WEBrick::HTTPStatus::NotFound, 'Group not found' unless g || g2 jobs = g.get_jobs_by_status(filter) if g # If there are processing jobs, add them as dummy executions. - if g2 + if g2 && !g2.empty? + Log.debug 'Group: ' + group_name + ' is processing:' g2.each do |job| - Log.debug 'Job: ' + job + Log.debug 'Job: ' + job.to_s j = ExecRightScript.new(group: group_name, job_uuid: job) jobs.push j end end @@ -427,24 +429,30 @@ # # Ask chimpd to process a Chimp object directly # if verb == 'process' or verb == 'add' - # comment the next line to GET STUCK IN PROCESSING forever - ChimpDaemon.instance.chimp_queue.push payload ChimpDaemon.instance.semaphore.synchronize do # While we are at it, we will store these processing jobs to prevent issues in the event # of a very slow API response. Log.debug 'Adding job: ' + job_uuid + ' to the processing queue for group: ' + group.to_s - q.processing[payload.group] = [] if q.processing[payload.group].nil? - q.processing[payload.group].push(payload.job_uuid) + q.processing[payload.group] = {} if q.processing[payload.group].nil? + q.processing[payload.group][payload.job_uuid.to_sym] = 0 + ChimpDaemon.instance.proc_counter += 1 end + # comment the next line to GET STUCK IN PROCESSING forever + ChimpDaemon.instance.chimp_queue.push payload - Log.debug 'Tasks in the processing queue:' + ChimpDaemon.instance.proc_counter.to_s - Log.debug 'Pocessing:' + # Proper count of processing Tasks + counter = 0 + q.processing.each{|k,v| + counter += v.size + } + Log.debug 'Processing:' + Log.debug 'Tasks in the processing queue:' + counter.to_s Log.debug q.processing.inspect elsif verb == 'update' puts 'UPDATE' q.get_job(job_id).status = payload.status end @@ -569,11 +577,11 @@ stats = "" stats << "running: #{queue.get_jobs_by_status(:running).size} / " stats << "waiting: #{queue.get_jobs_by_status(:none).size} / " stats << "failed: #{queue.get_jobs_by_status(:error).size} / " stats << "done: #{queue.get_jobs_by_status(:done).size} / " - stats << "processing: #{ChimpDaemon.instance.proc_counter.to_i} / " + stats << "processing: #{ChimpDaemon.instance.proc_counter} / " stats << "\n" resp.body = stats raise WEBrick::HTTPStatus::OK @@ -585,10 +593,10 @@ stats_hash = {"running" => queue.get_jobs_by_status(:running).size, "waiting" => queue.get_jobs_by_status(:none).size, "failed" => queue.get_jobs_by_status(:error).size, "done" => queue.get_jobs_by_status(:done).size, - "processing" => ChimpDaemon.instance.proc_counter.to_i, + "processing" => ChimpDaemon.instance.proc_counter, "holding" => queue.get_jobs_by_status(:holding).size } resp.body = JSON.generate(stats_hash)