lib/qless/lua/qless-lib.lua in qless-0.11.0 vs lib/qless/lua/qless-lib.lua in qless-0.12.0

- old
+ new

@@ -1,6 +1,6 @@ --- Current SHA: 525c39000dc71df53a3502491cb4daf0e1128f1d +-- Current SHA: 9d2cca3846a96fee53000085e36638e74ed392ed -- This is a generated file ------------------------------------------------------------------------------- -- Forward declarations to make everything happy ------------------------------------------------------------------------------- local Qless = { @@ -66,18 +66,18 @@ -- Failed([group, [start, [limit]]]) -- ------------------------------------ -- If no group is provided, this returns a JSON blob of the counts of the -- various groups of failures known. If a group is provided, it will report up -- to `limit` from `start` of the jobs affected by that issue. --- +-- -- # If no group, then... -- { -- 'group1': 1, -- 'group2': 5, -- ... -- } --- +-- -- # If a group is provided, then... -- { -- 'total': 20, -- 'jobs': [ -- { @@ -119,13 +119,13 @@ -- 'stalled' | 'running' | 'scheduled' | 'depends', 'recurring' -- ), queue, [offset, [count]]) ------------------------------------------------------------------------------- -- Return all the job ids currently considered to be in the provided state -- in a particular queue. The response is a list of job ids: --- +-- -- [ --- jid1, +-- jid1, -- jid2, -- ... -- ] function Qless.jobs(now, state, ...) assert(state, 'Jobs(): Arg "state" missing') @@ -152,11 +152,11 @@ queue:check_scheduled(now, queue.scheduled.length()) return queue.scheduled.peek(now, offset, count) elseif state == 'depends' then return queue.depends.peek(now, offset, count) elseif state == 'recurring' then - return queue.recurring.peek(math.huge, offset, count) + return queue.recurring.peek('+inf', offset, count) else error('Jobs(): Unknown type "' .. state .. '"') end end end @@ -167,11 +167,11 @@ -- If no arguments are provided, it returns details of all currently-tracked -- jobs. If the first argument is 'track', then it will start tracking the job -- associated with that id, and 'untrack' stops tracking it. In this context, -- tracking is nothing more than saving the job to a list of jobs that are -- considered special. --- +-- -- { -- 'jobs': [ -- { -- 'jid': ..., -- # All the other details you'd get from 'get' @@ -252,22 +252,22 @@ if tags then -- Decode the json blob, convert to dictionary tags = cjson.decode(tags) local _tags = {} for i,v in ipairs(tags) do _tags[v] = true end - + -- Otherwise, add the job to the sorted set with that tags for i=2,#arg do local tag = arg[i] - if _tags[tag] == nil then + if _tags[tag] == nil or _tags[tag] == false then _tags[tag] = true table.insert(tags, tag) end redis.call('zadd', 'ql:t:' .. tag, now, jid) redis.call('zincrby', 'ql:tags', 1, tag) end - + redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(tags)) return tags else error('Tag(): Job ' .. jid .. ' does not exist') end @@ -278,22 +278,22 @@ if tags then -- Decode the json blob, convert to dictionary tags = cjson.decode(tags) local _tags = {} for i,v in ipairs(tags) do _tags[v] = true end - + -- Otherwise, add the job to the sorted set with that tags for i=2,#arg do local tag = arg[i] _tags[tag] = nil redis.call('zrem', 'ql:t:' .. tag, jid) redis.call('zincrby', 'ql:tags', -1, tag) end - + local results = {} for i,tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end - + redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(results)) return results else error('Tag(): Job ' .. jid .. ' does not exist') end @@ -331,11 +331,11 @@ -- Now, we'll loop through every jid we intend to cancel, and we'll go -- make sure that this operation will be ok for i, jid in ipairs(arg) do for j, dep in ipairs(dependents[jid]) do - if dependents[dep] == nil then + if dependents[dep] == nil or dependents[dep] == false then error('Cancel(): ' .. jid .. ' is a dependency of ' .. dep .. ' but is not mentioned to be canceled') end end end @@ -416,11 +416,11 @@ -- Just go ahead and delete our data redis.call('del', QlessJob.ns .. jid) redis.call('del', QlessJob.ns .. jid .. '-history') end end - + return arg end ------------------------------------------------------------------------------- -- Configuration interactions @@ -533,30 +533,30 @@ end end -- Complete a job and optionally put it in another queue, either scheduled or -- to be considered waiting immediately. It can also optionally accept other --- jids on which this job will be considered dependent before it's considered +-- jids on which this job will be considered dependent before it's considered -- valid. -- -- The variable-length arguments may be pairs of the form: --- +-- -- ('next' , queue) : The queue to advance it to next -- ('delay' , delay) : The delay for the next queue -- ('depends', : Json of jobs it depends on in the new queue -- '["jid1", "jid2", ...]') --- -function QlessJob:complete(now, worker, queue, data, ...) +function QlessJob:complete(now, worker, queue, raw_data, ...) assert(worker, 'Complete(): Arg "worker" missing') assert(queue , 'Complete(): Arg "queue" missing') - data = assert(cjson.decode(data), - 'Complete(): Arg "data" missing or not JSON: ' .. tostring(data)) + local data = assert(cjson.decode(raw_data), + 'Complete(): Arg "data" missing or not JSON: ' .. tostring(raw_data)) -- Read in all the optional parameters local options = {} for i = 1, #arg, 2 do options[arg[i]] = arg[i + 1] end - + -- Sanity check on optional args local nextq = options['next'] local delay = assert(tonumber(options['delay'] or 0)) local depends = assert(cjson.decode(options['depends'] or '[]'), 'Complete(): Arg "depends" not JSON: ' .. tostring(options['depends'])) @@ -579,18 +579,19 @@ local lastworker, state, priority, retries, current_queue = unpack( redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state', 'priority', 'retries', 'queue')) if lastworker == false then - error('Complete(): Job does not exist') + error('Complete(): Job ' .. self.jid .. ' does not exist') elseif (state ~= 'running') then - error('Complete(): Job is not currently running: ' .. state) + error('Complete(): Job ' .. self.jid .. ' is not currently running: ' .. + state) elseif lastworker ~= worker then - error('Complete(): Job has been handed out to another worker: ' .. - tostring(lastworker)) + error('Complete(): Job ' .. self.jid .. + ' has been handed out to another worker: ' .. tostring(lastworker)) elseif queue ~= current_queue then - error('Complete(): Job running in another queue: ' .. + error('Complete(): Job ' .. self.jid .. ' running in another queue: ' .. tostring(current_queue)) end -- Now we can assume that the worker does own the job. We need to -- 1) Remove the job from the 'locks' from the old queue @@ -598,12 +599,12 @@ -- 3) Update the data -- 4) Mark the job as completed, remove the worker, remove expires, and -- update history self:history(now, 'done') - if data then - redis.call('hset', QlessJob.ns .. self.jid, 'data', cjson.encode(data)) + if raw_data then + redis.call('hset', QlessJob.ns .. self.jid, 'data', raw_data) end -- Remove the job from the previous queue local queue_obj = Qless.queue(queue) queue_obj.work.remove(self.jid) @@ -645,19 +646,19 @@ -- We're going to make sure that this queue is in the -- set of known queues if redis.call('zscore', 'ql:queues', nextq) == false then redis.call('zadd', 'ql:queues', now, nextq) end - + redis.call('hmset', QlessJob.ns .. self.jid, 'state', 'waiting', 'worker', '', 'failure', '{}', 'queue', nextq, 'expires', 0, 'remaining', tonumber(retries)) - + if (delay > 0) and (#depends == 0) then queue_obj.scheduled.add(now + delay, self.jid) return 'scheduled' else -- These are the jids we legitimately have to wait on @@ -701,22 +702,22 @@ 'worker', '', 'failure', '{}', 'queue', '', 'expires', 0, 'remaining', tonumber(retries)) - + -- Do the completion dance local count = Qless.config.get('jobs-history-count') local time = Qless.config.get('jobs-history') - + -- These are the default values count = tonumber(count or 50000) time = tonumber(time or 7 * 24 * 60 * 60) - + -- Schedule this job for destructination eventually redis.call('zadd', 'ql:completed', now, self.jid) - + -- Now look at the expired job data. First, based on the current time local jids = redis.call('zrangebyscore', 'ql:completed', 0, now - time) -- Any jobs that need to be expired... delete for index, jid in ipairs(jids) do local tags = cjson.decode( @@ -728,11 +729,11 @@ redis.call('del', QlessJob.ns .. jid) redis.call('del', QlessJob.ns .. jid .. '-history') end -- And now remove those from the queued-for-cleanup queue redis.call('zremrangebyscore', 'ql:completed', 0, now - time) - + -- Now take the all by the most recent 'count' ids jids = redis.call('zrange', 'ql:completed', 0, (-1-count)) for index, jid in ipairs(jids) do local tags = cjson.decode( redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}') @@ -742,11 +743,11 @@ end redis.call('del', QlessJob.ns .. jid) redis.call('del', QlessJob.ns .. jid .. '-history') end redis.call('zremrangebyrank', 'ql:completed', 0, (-1-count)) - + -- Alright, if this has any dependents, then we should go ahead -- and unstick those guys. for i, j in ipairs(redis.call( 'smembers', QlessJob.ns .. self.jid .. '-dependents')) do redis.call('srem', QlessJob.ns .. j .. '-dependencies', self.jid) @@ -766,32 +767,32 @@ redis.call('hset', QlessJob.ns .. j, 'state', 'waiting') end end end end - + -- Delete our dependents key redis.call('del', QlessJob.ns .. self.jid .. '-dependents') - + return 'complete' end end -- Fail(now, worker, group, message, [data]) -- ------------------------------------------------- -- Mark the particular job as failed, with the provided group, and a more -- specific message. By `group`, we mean some phrase that might be one of -- several categorical modes of failure. The `message` is something more -- job-specific, like perhaps a traceback. --- +-- -- This method should __not__ be used to note that a job has been dropped or -- has failed in a transient way. This method __should__ be used to note that -- a job has something really wrong with it that must be remedied. --- +-- -- The motivation behind the `group` is so that similar errors can be grouped -- together. Optionally, updated data can be provided for the job. A job in --- any state can be marked as failed. If it has been given to a worker as a +-- any state can be marked as failed. If it has been given to a worker as a -- job, then its subsequent requests to heartbeat or complete that job will -- fail. Failed jobs are kept until they are canceled or completed. -- -- __Returns__ the id of the failed job if successful, or `False` on failure. -- @@ -819,15 +820,16 @@ local queue, state, oldworker = unpack(redis.call( 'hmget', QlessJob.ns .. self.jid, 'queue', 'state', 'worker')) -- If the job has been completed, we cannot fail it if not state then - error('Fail(): Job does not exist') + error('Fail(): Job ' .. self.jid .. 'does not exist') elseif state ~= 'running' then - error('Fail(): Job not currently running: ' .. state) + error('Fail(): Job ' .. self.jid .. 'not currently running: ' .. state) elseif worker ~= oldworker then - error('Fail(): Job running with another worker: ' .. oldworker) + error('Fail(): Job ' .. self.jid .. ' running with another worker: ' .. + oldworker) end -- Send out a log message Qless.publish('log', cjson.encode({ jid = self.jid, @@ -858,11 +860,11 @@ local queue_obj = Qless.queue(queue) queue_obj.work.remove(self.jid) queue_obj.locks.remove(self.jid) queue_obj.scheduled.remove(self.jid) - -- The reason that this appears here is that the above will fail if the + -- The reason that this appears here is that the above will fail if the -- job doesn't exist if data then redis.call('hset', QlessJob.ns .. self.jid, 'data', cjson.encode(data)) end @@ -895,11 +897,11 @@ -- retries a job has for a stage. -- -- Throws an exception if: -- - the worker is not the worker with a lock on the job -- - the job is not actually running --- +-- -- Otherwise, it returns the number of retries remaining. If the allowed -- retries have been exhausted, then it is automatically failed, and a negative -- number is returned. -- -- If a group and message is provided, then if the retries are exhausted, then @@ -908,23 +910,25 @@ function QlessJob:retry(now, queue, worker, delay, group, message) assert(queue , 'Retry(): Arg "queue" missing') assert(worker, 'Retry(): Arg "worker" missing') delay = assert(tonumber(delay or 0), 'Retry(): Arg "delay" not a number: ' .. tostring(delay)) - + -- Let's see what the old priority, and tags were local oldqueue, state, retries, oldworker, priority, failure = unpack( redis.call('hmget', QlessJob.ns .. self.jid, 'queue', 'state', 'retries', 'worker', 'priority', 'failure')) -- If this isn't the worker that owns if oldworker == false then - error('Retry(): Job does not exist') + error('Retry(): Job ' .. self.jid .. ' does not exist') elseif state ~= 'running' then - error('Retry(): Job is not currently running: ' .. state) + error('Retry(): Job ' .. self.jid .. ' is not currently running: ' .. + state) elseif oldworker ~= worker then - error('Retry(): Job has been given to another worker: ' .. oldworker) + error('Retry(): Job ' .. self.jid .. + ' has been given to another worker: ' .. oldworker) end -- For each of these, decrement their retries. If any of them -- have exhausted their retries, then we should mark them as -- failed. @@ -941,11 +945,11 @@ if remaining < 0 then -- Now remove the instance from the schedule, and work queues for the -- queue it's in local group = group or 'failed-retries-' .. queue self:history(now, 'failed', {['group'] = group}) - + redis.call('hmset', QlessJob.ns .. self.jid, 'state', 'failed', 'worker', '', 'expires', '') -- If the failure has not already been set, then set it if group ~= nil and message ~= nil then @@ -965,11 +969,11 @@ 'Job exhausted retries in queue "' .. oldqueue .. '"', ['when'] = now, ['worker'] = unpack(self:data('worker')) })) end - + -- Add this type of failure to the list of failures redis.call('sadd', 'ql:failures', group) -- And add this particular instance to the failed types redis.call('lpush', 'ql:f:' .. group, self.jid) -- Increment the count of the failed jobs @@ -1101,15 +1105,18 @@ -- worker local job_worker, state = unpack( redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state')) if job_worker == false then -- This means the job doesn't exist - error('Heartbeat(): Job does not exist') + error('Heartbeat(): Job ' .. self.jid .. ' does not exist') elseif state ~= 'running' then - error('Heartbeat(): Job not currently running: ' .. state) + error( + 'Heartbeat(): Job ' .. self.jid .. ' not currently running: ' .. state) elseif job_worker ~= worker or #job_worker == 0 then - error('Heartbeat(): Job given out to another worker: ' .. job_worker) + error( + 'Heartbeat(): Job ' .. self.jid .. + ' given out to another worker: ' .. job_worker) else -- Otherwise, optionally update the user data, and the heartbeat if data then -- I don't know if this is wise, but I'm decoding and encoding -- the user data to hopefully ensure its sanity @@ -1117,15 +1124,15 @@ expires, 'worker', worker, 'data', cjson.encode(data)) else redis.call('hmset', QlessJob.ns .. self.jid, 'expires', expires, 'worker', worker) end - + -- Update hwen this job was last updated on that worker -- Add this job to the list of jobs handled by this worker redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, self.jid) - + -- And now we should just update the locks local queue = Qless.queue( redis.call('hget', QlessJob.ns .. self.jid, 'queue')) queue.locks.add(expires, self.jid) return expires @@ -1142,11 +1149,11 @@ tostring(priority)) -- Get the queue the job is currently in, if any local queue = redis.call('hget', QlessJob.ns .. self.jid, 'queue') - if queue == nil then + if queue == nil or queue == false then -- If the job doesn't exist, throw an error error('Priority(): Job ' .. self.jid .. ' does not exist') elseif queue == '' then -- Just adjust the priority redis.call('hset', QlessJob.ns .. self.jid, 'priority', priority) @@ -1175,20 +1182,20 @@ -- Times out the job now rather than when its lock is normally set to expire function QlessJob:timeout(now) local queue_name, state, worker = unpack(redis.call('hmget', QlessJob.ns .. self.jid, 'queue', 'state', 'worker')) - if queue_name == nil then - error('Timeout(): Job does not exist') + if queue_name == nil or queue_name == false then + error('Timeout(): Job ' .. self.jid .. ' does not exist') elseif state ~= 'running' then error('Timeout(): Job ' .. self.jid .. ' not running') else -- Time out the job self:history(now, 'timed-out') local queue = Qless.queue(queue_name) queue.locks.remove(self.jid) - queue.work.add(now, math.huge, self.jid) + queue.work.add(now, '+inf', self.jid) redis.call('hmset', QlessJob.ns .. self.jid, 'state', 'stalled', 'expires', 0) local encoded = cjson.encode({ jid = self.jid, event = 'lock_lost', @@ -1259,11 +1266,11 @@ local count = tonumber(Qless.config.get('max-job-history', 100)) if count > 0 then -- We'll always keep the first item around local obj = redis.call('lpop', QlessJob.ns .. self.jid .. '-history') redis.call('ltrim', QlessJob.ns .. self.jid .. '-history', -count + 2, -1) - if obj ~= nil then + if obj ~= nil and obj ~= false then redis.call('lpush', QlessJob.ns .. self.jid .. '-history', obj) end end return redis.call('rpush', QlessJob.ns .. self.jid .. '-history', cjson.encode({math.floor(now), what, item})) @@ -1294,12 +1301,15 @@ end, remove = function(...) if #arg > 0 then return redis.call('zrem', queue:prefix('work'), unpack(arg)) end end, add = function(now, priority, jid) + if priority ~= '+inf' then + priority = priority - (now / 10000000000) + end return redis.call('zadd', - queue:prefix('work'), priority - (now / 10000000000), jid) + queue:prefix('work'), priority, jid) end, score = function(jid) return redis.call('zscore', queue:prefix('work'), jid) end, length = function() return redis.call('zcard', queue:prefix('work')) end @@ -1307,22 +1317,22 @@ -- Access to our locks queue.locks = { expired = function(now, offset, count) return redis.call('zrangebyscore', - queue:prefix('locks'), -math.huge, now, 'LIMIT', offset, count) + queue:prefix('locks'), '-inf', now, 'LIMIT', offset, count) end, peek = function(now, offset, count) return redis.call('zrangebyscore', queue:prefix('locks'), - now, math.huge, 'LIMIT', offset, count) + now, '+inf', 'LIMIT', offset, count) end, add = function(expires, jid) redis.call('zadd', queue:prefix('locks'), expires, jid) end, remove = function(...) if #arg > 0 then return redis.call('zrem', queue:prefix('locks'), unpack(arg)) end end, running = function(now) - return redis.call('zcount', queue:prefix('locks'), now, math.huge) + return redis.call('zcount', queue:prefix('locks'), now, '+inf') end, length = function(now) -- If a 'now' is provided, we're interested in how many are before -- that time if now then return redis.call('zcount', queue:prefix('locks'), 0, now) @@ -1451,15 +1461,15 @@ -- The results we'll be sending back local results = {} local key = 'ql:s:' .. name .. ':' .. bin .. ':' .. queue local count, mean, vk = unpack(redis.call('hmget', key, 'total', 'mean', 'vk')) - + count = tonumber(count) or 0 mean = tonumber(mean) or 0 vk = tonumber(vk) - + results.count = count or 0 results.mean = mean or 0 results.histogram = {} if not count then @@ -1505,12 +1515,12 @@ -- look for all the recurring jobs that need jobs run self:check_recurring(now, count - #jids) -- Now we've checked __all__ the locks for this queue the could -- have expired, and are no more than the number requested. If - -- we still need values in order to meet the demand, then we - -- should check if any scheduled items, and if so, we should + -- we still need values in order to meet the demand, then we + -- should check if any scheduled items, and if so, we should -- insert them to ensure correctness when pulling off the next -- unit of work. self:check_scheduled(now, count - #jids) -- With these in place, we can expand this list of jids based on the work @@ -1580,12 +1590,12 @@ -- If we still need jobs in order to meet demand, then we should -- look for all the recurring jobs that need jobs run self:check_recurring(now, count - #jids) - -- If we still need values in order to meet the demand, then we - -- should check if any scheduled items, and if so, we should + -- If we still need values in order to meet the demand, then we + -- should check if any scheduled items, and if so, we should -- insert them to ensure correctness when pulling off the next -- unit of work. self:check_scheduled(now, count - #jids) -- With these in place, we can expand this list of jids based on the work @@ -1603,23 +1613,23 @@ redis.call('hget', QlessJob.ns .. jid, 'time') or now) local waiting = now - time self:stat(now, 'wait', waiting) redis.call('hset', QlessJob.ns .. jid, 'time', string.format("%.20f", now)) - + -- Add this job to the list of jobs handled by this worker redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, jid) - + -- Update the jobs data, and add its locks, and return the job job:update({ worker = worker, expires = expires, state = 'running' }) - + self.locks.add(expires, jid) - + local tracked = redis.call('zscore', 'ql:tracked', jid) ~= false if tracked then Qless.publish('popped', jid) end end @@ -1666,11 +1676,11 @@ redis.call('hincrby', key, 'm' .. math.floor(val / 60), 1) elseif val < 86400 then -- hours redis.call('hincrby', key, 'h' .. math.floor(val / 3600), 1) else -- days redis.call('hincrby', key, 'd' .. math.floor(val / 86400), 1) - end + end redis.call('hmset', key, 'total', count, 'mean', mean, 'vk', vk) end -- Put(now, jid, klass, data, delay, -- [priority, p], @@ -1726,12 +1736,12 @@ for _, d in ipairs(depends) do new[d] = 1 end -- Now find what's in the original, but not the new local original = redis.call( 'smembers', QlessJob.ns .. jid .. '-dependencies') - for _, dep in pairs(original) do - if new[dep] == nil then + for _, dep in pairs(original) do + if new[dep] == nil or new[dep] == false then -- Remove k as a dependency redis.call('srem', QlessJob.ns .. dep .. '-dependents' , jid) redis.call('srem', QlessJob.ns .. jid .. '-dependencies', dep) end end @@ -1849,11 +1859,11 @@ self.work.add(now, priority, jid) end end -- Lastly, we're going to make sure that this item is in the - -- set of known queues. We should keep this sorted by the + -- set of known queues. We should keep this sorted by the -- order in which we saw each of these queues if redis.call('zscore', 'ql:queues', self.name) == false then redis.call('zadd', 'ql:queues', now, self.name) end @@ -1919,11 +1929,11 @@ -- Read in all the optional parameters. All of these must come in -- pairs, so if we have an odd number of extra args, raise an error if #arg % 2 == 1 then error('Odd number of additional args: ' .. tostring(arg)) end - + -- Read in all the optional parameters local options = {} for i = 3, #arg, 2 do options[arg[i]] = arg[i + 1] end options.tags = assert(cjson.decode(options.tags or '{}'), 'Recur(): Arg "tags" must be JSON string array: ' .. tostring( @@ -1939,16 +1949,16 @@ options.backlog)) local count, old_queue = unpack(redis.call('hmget', 'ql:r:' .. jid, 'count', 'queue')) count = count or 0 - -- If it has previously been in another queue, then we should remove + -- If it has previously been in another queue, then we should remove -- some information about it if old_queue then Qless.queue(old_queue).recurring.remove(jid) end - + -- Do some insertions redis.call('hmset', 'ql:r:' .. jid, 'jid' , jid, 'klass' , klass, 'data' , raw_data, @@ -1962,18 +1972,18 @@ 'interval', interval, 'retries' , options.retries, 'backlog' , options.backlog) -- Now, we should schedule the next run of the job self.recurring.add(now + offset, jid) - + -- Lastly, we're going to make sure that this item is in the - -- set of known queues. We should keep this sorted by the + -- set of known queues. We should keep this sorted by the -- order in which we saw each of these queues if redis.call('zscore', 'ql:queues', self.name) == false then redis.call('zadd', 'ql:queues', now, self.name) end - + return jid else error('Recur(): schedule type "' .. tostring(spec) .. '" unknown') end end @@ -2015,26 +2025,26 @@ score = score + ( math.ceil(num - backlog) * interval ) end end - - -- We're saving this value so that in the history, we can accurately + + -- We're saving this value so that in the history, we can accurately -- reflect when the job would normally have been scheduled while (score <= now) and (moved < count) do local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1) moved = moved + 1 local child_jid = jid .. '-' .. count - + -- Add this job to the list of jobs tagged with whatever tags were -- supplied for i, tag in ipairs(_tags) do redis.call('zadd', 'ql:t:' .. tag, now, child_jid) redis.call('zincrby', 'ql:tags', 1, tag) end - + -- First, let's save its data redis.call('hmset', QlessJob.ns .. child_jid, 'jid' , child_jid, 'klass' , klass, 'data' , data, @@ -2047,16 +2057,16 @@ 'retries' , retries, 'remaining' , retries, 'time' , string.format("%.20f", score), 'spawned_from_jid', jid) Qless.job(child_jid):history(score, 'put', {q = self.name}) - + -- Now, if a delay was provided, and if it's in the future, -- then we'll have to schedule it. Otherwise, we're just -- going to add it to the work queue. self.work.add(score, priority, child_jid) - + score = score + interval self.recurring.add(score, jid) end end end @@ -2067,11 +2077,11 @@ function QlessQueue:check_scheduled(now, count) -- zadd is a list of arguments that we'll be able to use to -- insert into the work queue local scheduled = self.scheduled.ready(now, 0, count) for index, jid in ipairs(scheduled) do - -- With these in hand, we'll have to go out and find the + -- With these in hand, we'll have to go out and find the -- priorities of these jobs, and then we'll insert them -- into the work queue and then when that's complete, we'll -- remove them from the scheduled queue local priority = tonumber( redis.call('hget', QlessJob.ns .. jid, 'priority') or 0) @@ -2152,19 +2162,19 @@ redis.call('hdel', QlessJob.ns .. jid, 'grace', 0) -- See how many remaining retries the job has local remaining = tonumber(redis.call( 'hincrby', QlessJob.ns .. jid, 'remaining', -1)) - + -- This is where we actually have to time out the work if remaining < 0 then -- Now remove the instance from the schedule, and work queues -- for the queue it's in self.work.remove(jid) self.locks.remove(jid) self.scheduled.remove(jid) - + local group = 'failed-retries-' .. Qless.job(jid):data()['queue'] local job = Qless.job(jid) job:history(now, 'failed', {group = group}) redis.call('hmset', QlessJob.ns .. jid, 'state', 'failed', 'worker', '', @@ -2176,16 +2186,16 @@ ['message'] = 'Job exhausted retries in queue "' .. self.name .. '"', ['when'] = now, ['worker'] = unpack(job:data('worker')) })) - + -- Add this type of failure to the list of failures redis.call('sadd', 'ql:failures', group) -- And add this particular instance to the failed types redis.call('lpush', 'ql:f:' .. group, jid) - + if redis.call('zscore', 'ql:tracked', jid) ~= false then Qless.publish('failed', jid) end Qless.publish('log', cjson.encode({ jid = jid, @@ -2258,15 +2268,15 @@ -- Get all the attributes of this particular job function QlessRecurringJob:data() local job = redis.call( 'hmget', 'ql:r:' .. self.jid, 'jid', 'klass', 'state', 'queue', 'priority', 'interval', 'retries', 'count', 'data', 'tags', 'backlog') - + if not job[1] then return nil end - + return { jid = job[1], klass = job[2], state = job[3], queue = job[4], @@ -2285,11 +2295,11 @@ -- - interval -- - retries -- - data -- - klass -- - queue --- - backlog +-- - backlog function QlessRecurringJob:update(now, ...) local options = {} -- Make sure that the job exists if redis.call('exists', 'ql:r:' .. self.jid) ~= 0 then for i = 1, #arg, 2 do @@ -2343,14 +2353,14 @@ if tags then -- Decode the json blob, convert to dictionary tags = cjson.decode(tags) local _tags = {} for i,v in ipairs(tags) do _tags[v] = true end - + -- Otherwise, add the job to the sorted set with that tags - for i=1,#arg do if _tags[arg[i]] == nil then table.insert(tags, arg[i]) end end - + for i=1,#arg do if _tags[arg[i]] == nil or _tags[arg[i]] == false then table.insert(tags, arg[i]) end end + tags = cjson.encode(tags) redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags) return tags else error('Tag(): Job ' .. self.jid .. ' does not exist') @@ -2402,23 +2412,23 @@ end -- Provide data about all the workers, or if a specific worker is provided, -- then which jobs that worker is responsible for. If no worker is provided, -- expect a response of the form: --- +-- -- [ -- # This is sorted by the recency of activity from that worker -- { -- 'name' : 'hostname1-pid1', -- 'jobs' : 20, -- 'stalled': 0 -- }, { -- ... -- } -- ] --- +-- -- If a worker id is provided, then expect a response of the form: --- +-- -- { -- 'jobs': [ -- jid1, -- jid2, -- ...