lib/right_chimp/daemon/ChimpDaemon.rb in right_chimp-2.1.22.2 vs lib/right_chimp/daemon/ChimpDaemon.rb in right_chimp-2.1.24
- old
+ new
@@ -248,15 +248,13 @@
Log.debug "Spawning #{n} submission processing threads"
(1..n).each do |n|
@threads ||=[]
@threads << Thread.new {
-
while true
begin
-
queued_request = @chimp_queue.pop
group = queued_request.group
queued_request.interactive = false
tasks = queued_request.process
tasks.each do |task|
@@ -341,18 +339,42 @@
#
# GET a group by name and status
# /group/<name>/<status>
#
def do_GET(req, resp)
- jobs = {}
+ jobs = []
+ Log.debug 'get group info'
group_name = req.request_uri.path.split('/')[-2]
filter = req.request_uri.path.split('/')[-1]
+ # Quickly check processing jobs just in case
+ # Instance the entire queue
+ q = ChimpQueue.instance
+ g2 = q.processing[group_name.to_sym]
+
+ if g2
+ Log.debug 'Found processing job(s): ' + g2.inspect
+ else
+ Log.debug 'not found any processing jobs for that group: ' + g2.inspect
+ end
+
g = ChimpQueue[group_name.to_sym]
- raise WEBrick::HTTPStatus::NotFound, "Group not found" unless g
- jobs = g.get_jobs_by_status(filter)
+ raise WEBrick::HTTPStatus::NotFound, 'Group not found' unless g || g2
+ jobs = g.get_jobs_by_status(filter) if g
+
+ # If there are processing jobs, add them as dummy executions.
+ if g2
+ Log.debug 'Group: ' + group_name + ' is processing:'
+ g2.each do |job|
+ Log.debug 'Job: ' + job
+ j = ExecRightScript.new(group: group_name, job_uuid: job)
+ jobs.push j
+ end
+ end
+
resp.body = jobs.to_yaml
+
raise WEBrick::HTTPStatus::OK
end
#
# POST to a group to trigger a group action
@@ -405,32 +427,41 @@
#
# Ask chimpd to process a Chimp object directly
#
if verb == 'process' or verb == 'add'
+ # comment the next line to GET STUCK IN PROCESSING forever
ChimpDaemon.instance.chimp_queue.push payload
ChimpDaemon.instance.semaphore.synchronize do
- ChimpDaemon.instance.proc_counter +=1
+ # While we are at it, we will store these processing jobs to prevent issues in the event
+ # of a very slow API response.
+ Log.debug 'Adding job: ' + job_uuid + ' to the processing queue for group: ' + group.to_s
+ q.processing[payload.group] = [] if q.processing[payload.group].nil?
+ q.processing[payload.group].push(payload.job_uuid)
+
+ ChimpDaemon.instance.proc_counter += 1
end
- Log.debug "Tasks in the processing queue: #{ChimpDaemon.instance.proc_counter.to_s}"
- id = 0
+
+ Log.debug 'Tasks in the processing queue:' + ChimpDaemon.instance.proc_counter.to_s
+ Log.debug 'Pocessing:'
+ Log.debug q.processing.inspect
elsif verb == 'update'
- puts "UPDATE"
+ puts 'UPDATE'
q.get_job(job_id).status = payload.status
end
resp.body = {
- 'job_uuid' => job_uuid ,
+ 'job_uuid' => job_uuid,
'id' => job_id
}.to_yaml
raise WEBrick::HTTPStatus::OK
end
def do_GET(req, resp)
id = self.get_id(req)
verb = self.get_verb(req)
- job_results = "OK"
+ job_results = 'OK'
queue = ChimpQueue.instance
#
# check for special job ids
#