lib/qless/lua/qless-lib.lua in qless-0.11.0 vs lib/qless/lua/qless-lib.lua in qless-0.12.0
- old
+ new
@@ -1,6 +1,6 @@
--- Current SHA: 525c39000dc71df53a3502491cb4daf0e1128f1d
+-- Current SHA: 9d2cca3846a96fee53000085e36638e74ed392ed
-- This is a generated file
-------------------------------------------------------------------------------
-- Forward declarations to make everything happy
-------------------------------------------------------------------------------
local Qless = {
@@ -66,18 +66,18 @@
-- Failed([group, [start, [limit]]])
-- ------------------------------------
-- If no group is provided, this returns a JSON blob of the counts of the
-- various groups of failures known. If a group is provided, it will report up
-- to `limit` from `start` of the jobs affected by that issue.
---
+--
-- # If no group, then...
-- {
-- 'group1': 1,
-- 'group2': 5,
-- ...
-- }
---
+--
-- # If a group is provided, then...
-- {
-- 'total': 20,
-- 'jobs': [
-- {
@@ -119,13 +119,13 @@
-- 'stalled' | 'running' | 'scheduled' | 'depends', 'recurring'
-- ), queue, [offset, [count]])
-------------------------------------------------------------------------------
-- Return all the job ids currently considered to be in the provided state
-- in a particular queue. The response is a list of job ids:
---
+--
-- [
--- jid1,
+-- jid1,
-- jid2,
-- ...
-- ]
function Qless.jobs(now, state, ...)
assert(state, 'Jobs(): Arg "state" missing')
@@ -152,11 +152,11 @@
queue:check_scheduled(now, queue.scheduled.length())
return queue.scheduled.peek(now, offset, count)
elseif state == 'depends' then
return queue.depends.peek(now, offset, count)
elseif state == 'recurring' then
- return queue.recurring.peek(math.huge, offset, count)
+ return queue.recurring.peek('+inf', offset, count)
else
error('Jobs(): Unknown type "' .. state .. '"')
end
end
end
@@ -167,11 +167,11 @@
-- If no arguments are provided, it returns details of all currently-tracked
-- jobs. If the first argument is 'track', then it will start tracking the job
-- associated with that id, and 'untrack' stops tracking it. In this context,
-- tracking is nothing more than saving the job to a list of jobs that are
-- considered special.
---
+--
-- {
-- 'jobs': [
-- {
-- 'jid': ...,
-- # All the other details you'd get from 'get'
@@ -252,22 +252,22 @@
if tags then
-- Decode the json blob, convert to dictionary
tags = cjson.decode(tags)
local _tags = {}
for i,v in ipairs(tags) do _tags[v] = true end
-
+
-- Otherwise, add the job to the sorted set with that tags
for i=2,#arg do
local tag = arg[i]
- if _tags[tag] == nil then
+ if _tags[tag] == nil or _tags[tag] == false then
_tags[tag] = true
table.insert(tags, tag)
end
redis.call('zadd', 'ql:t:' .. tag, now, jid)
redis.call('zincrby', 'ql:tags', 1, tag)
end
-
+
redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(tags))
return tags
else
error('Tag(): Job ' .. jid .. ' does not exist')
end
@@ -278,22 +278,22 @@
if tags then
-- Decode the json blob, convert to dictionary
tags = cjson.decode(tags)
local _tags = {}
for i,v in ipairs(tags) do _tags[v] = true end
-
+
-- Otherwise, add the job to the sorted set with that tags
for i=2,#arg do
local tag = arg[i]
_tags[tag] = nil
redis.call('zrem', 'ql:t:' .. tag, jid)
redis.call('zincrby', 'ql:tags', -1, tag)
end
-
+
local results = {}
for i,tag in ipairs(tags) do if _tags[tag] then table.insert(results, tag) end end
-
+
redis.call('hset', QlessJob.ns .. jid, 'tags', cjson.encode(results))
return results
else
error('Tag(): Job ' .. jid .. ' does not exist')
end
@@ -331,11 +331,11 @@
-- Now, we'll loop through every jid we intend to cancel, and we'll go
-- make sure that this operation will be ok
for i, jid in ipairs(arg) do
for j, dep in ipairs(dependents[jid]) do
- if dependents[dep] == nil then
+ if dependents[dep] == nil or dependents[dep] == false then
error('Cancel(): ' .. jid .. ' is a dependency of ' .. dep ..
' but is not mentioned to be canceled')
end
end
end
@@ -416,11 +416,11 @@
-- Just go ahead and delete our data
redis.call('del', QlessJob.ns .. jid)
redis.call('del', QlessJob.ns .. jid .. '-history')
end
end
-
+
return arg
end
-------------------------------------------------------------------------------
-- Configuration interactions
@@ -533,30 +533,30 @@
end
end
-- Complete a job and optionally put it in another queue, either scheduled or
-- to be considered waiting immediately. It can also optionally accept other
--- jids on which this job will be considered dependent before it's considered
+-- jids on which this job will be considered dependent before it's considered
-- valid.
--
-- The variable-length arguments may be pairs of the form:
---
+--
-- ('next' , queue) : The queue to advance it to next
-- ('delay' , delay) : The delay for the next queue
-- ('depends', : Json of jobs it depends on in the new queue
-- '["jid1", "jid2", ...]')
---
-function QlessJob:complete(now, worker, queue, data, ...)
+function QlessJob:complete(now, worker, queue, raw_data, ...)
assert(worker, 'Complete(): Arg "worker" missing')
assert(queue , 'Complete(): Arg "queue" missing')
- data = assert(cjson.decode(data),
- 'Complete(): Arg "data" missing or not JSON: ' .. tostring(data))
+ local data = assert(cjson.decode(raw_data),
+ 'Complete(): Arg "data" missing or not JSON: ' .. tostring(raw_data))
-- Read in all the optional parameters
local options = {}
for i = 1, #arg, 2 do options[arg[i]] = arg[i + 1] end
-
+
-- Sanity check on optional args
local nextq = options['next']
local delay = assert(tonumber(options['delay'] or 0))
local depends = assert(cjson.decode(options['depends'] or '[]'),
'Complete(): Arg "depends" not JSON: ' .. tostring(options['depends']))
@@ -579,18 +579,19 @@
local lastworker, state, priority, retries, current_queue = unpack(
redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state',
'priority', 'retries', 'queue'))
if lastworker == false then
- error('Complete(): Job does not exist')
+ error('Complete(): Job ' .. self.jid .. ' does not exist')
elseif (state ~= 'running') then
- error('Complete(): Job is not currently running: ' .. state)
+ error('Complete(): Job ' .. self.jid .. ' is not currently running: ' ..
+ state)
elseif lastworker ~= worker then
- error('Complete(): Job has been handed out to another worker: ' ..
- tostring(lastworker))
+ error('Complete(): Job ' .. self.jid ..
+ ' has been handed out to another worker: ' .. tostring(lastworker))
elseif queue ~= current_queue then
- error('Complete(): Job running in another queue: ' ..
+ error('Complete(): Job ' .. self.jid .. ' running in another queue: ' ..
tostring(current_queue))
end
-- Now we can assume that the worker does own the job. We need to
-- 1) Remove the job from the 'locks' from the old queue
@@ -598,12 +599,12 @@
-- 3) Update the data
-- 4) Mark the job as completed, remove the worker, remove expires, and
-- update history
self:history(now, 'done')
- if data then
- redis.call('hset', QlessJob.ns .. self.jid, 'data', cjson.encode(data))
+ if raw_data then
+ redis.call('hset', QlessJob.ns .. self.jid, 'data', raw_data)
end
-- Remove the job from the previous queue
local queue_obj = Qless.queue(queue)
queue_obj.work.remove(self.jid)
@@ -645,19 +646,19 @@
-- We're going to make sure that this queue is in the
-- set of known queues
if redis.call('zscore', 'ql:queues', nextq) == false then
redis.call('zadd', 'ql:queues', now, nextq)
end
-
+
redis.call('hmset', QlessJob.ns .. self.jid,
'state', 'waiting',
'worker', '',
'failure', '{}',
'queue', nextq,
'expires', 0,
'remaining', tonumber(retries))
-
+
if (delay > 0) and (#depends == 0) then
queue_obj.scheduled.add(now + delay, self.jid)
return 'scheduled'
else
-- These are the jids we legitimately have to wait on
@@ -701,22 +702,22 @@
'worker', '',
'failure', '{}',
'queue', '',
'expires', 0,
'remaining', tonumber(retries))
-
+
-- Do the completion dance
local count = Qless.config.get('jobs-history-count')
local time = Qless.config.get('jobs-history')
-
+
-- These are the default values
count = tonumber(count or 50000)
time = tonumber(time or 7 * 24 * 60 * 60)
-
+
-- Schedule this job for destructination eventually
redis.call('zadd', 'ql:completed', now, self.jid)
-
+
-- Now look at the expired job data. First, based on the current time
local jids = redis.call('zrangebyscore', 'ql:completed', 0, now - time)
-- Any jobs that need to be expired... delete
for index, jid in ipairs(jids) do
local tags = cjson.decode(
@@ -728,11 +729,11 @@
redis.call('del', QlessJob.ns .. jid)
redis.call('del', QlessJob.ns .. jid .. '-history')
end
-- And now remove those from the queued-for-cleanup queue
redis.call('zremrangebyscore', 'ql:completed', 0, now - time)
-
+
-- Now take the all by the most recent 'count' ids
jids = redis.call('zrange', 'ql:completed', 0, (-1-count))
for index, jid in ipairs(jids) do
local tags = cjson.decode(
redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}')
@@ -742,11 +743,11 @@
end
redis.call('del', QlessJob.ns .. jid)
redis.call('del', QlessJob.ns .. jid .. '-history')
end
redis.call('zremrangebyrank', 'ql:completed', 0, (-1-count))
-
+
-- Alright, if this has any dependents, then we should go ahead
-- and unstick those guys.
for i, j in ipairs(redis.call(
'smembers', QlessJob.ns .. self.jid .. '-dependents')) do
redis.call('srem', QlessJob.ns .. j .. '-dependencies', self.jid)
@@ -766,32 +767,32 @@
redis.call('hset', QlessJob.ns .. j, 'state', 'waiting')
end
end
end
end
-
+
-- Delete our dependents key
redis.call('del', QlessJob.ns .. self.jid .. '-dependents')
-
+
return 'complete'
end
end
-- Fail(now, worker, group, message, [data])
-- -------------------------------------------------
-- Mark the particular job as failed, with the provided group, and a more
-- specific message. By `group`, we mean some phrase that might be one of
-- several categorical modes of failure. The `message` is something more
-- job-specific, like perhaps a traceback.
---
+--
-- This method should __not__ be used to note that a job has been dropped or
-- has failed in a transient way. This method __should__ be used to note that
-- a job has something really wrong with it that must be remedied.
---
+--
-- The motivation behind the `group` is so that similar errors can be grouped
-- together. Optionally, updated data can be provided for the job. A job in
--- any state can be marked as failed. If it has been given to a worker as a
+-- any state can be marked as failed. If it has been given to a worker as a
-- job, then its subsequent requests to heartbeat or complete that job will
-- fail. Failed jobs are kept until they are canceled or completed.
--
-- __Returns__ the id of the failed job if successful, or `False` on failure.
--
@@ -819,15 +820,16 @@
local queue, state, oldworker = unpack(redis.call(
'hmget', QlessJob.ns .. self.jid, 'queue', 'state', 'worker'))
-- If the job has been completed, we cannot fail it
if not state then
- error('Fail(): Job does not exist')
+ error('Fail(): Job ' .. self.jid .. 'does not exist')
elseif state ~= 'running' then
- error('Fail(): Job not currently running: ' .. state)
+ error('Fail(): Job ' .. self.jid .. 'not currently running: ' .. state)
elseif worker ~= oldworker then
- error('Fail(): Job running with another worker: ' .. oldworker)
+ error('Fail(): Job ' .. self.jid .. ' running with another worker: ' ..
+ oldworker)
end
-- Send out a log message
Qless.publish('log', cjson.encode({
jid = self.jid,
@@ -858,11 +860,11 @@
local queue_obj = Qless.queue(queue)
queue_obj.work.remove(self.jid)
queue_obj.locks.remove(self.jid)
queue_obj.scheduled.remove(self.jid)
- -- The reason that this appears here is that the above will fail if the
+ -- The reason that this appears here is that the above will fail if the
-- job doesn't exist
if data then
redis.call('hset', QlessJob.ns .. self.jid, 'data', cjson.encode(data))
end
@@ -895,11 +897,11 @@
-- retries a job has for a stage.
--
-- Throws an exception if:
-- - the worker is not the worker with a lock on the job
-- - the job is not actually running
---
+--
-- Otherwise, it returns the number of retries remaining. If the allowed
-- retries have been exhausted, then it is automatically failed, and a negative
-- number is returned.
--
-- If a group and message is provided, then if the retries are exhausted, then
@@ -908,23 +910,25 @@
function QlessJob:retry(now, queue, worker, delay, group, message)
assert(queue , 'Retry(): Arg "queue" missing')
assert(worker, 'Retry(): Arg "worker" missing')
delay = assert(tonumber(delay or 0),
'Retry(): Arg "delay" not a number: ' .. tostring(delay))
-
+
-- Let's see what the old priority, and tags were
local oldqueue, state, retries, oldworker, priority, failure = unpack(
redis.call('hmget', QlessJob.ns .. self.jid, 'queue', 'state',
'retries', 'worker', 'priority', 'failure'))
-- If this isn't the worker that owns
if oldworker == false then
- error('Retry(): Job does not exist')
+ error('Retry(): Job ' .. self.jid .. ' does not exist')
elseif state ~= 'running' then
- error('Retry(): Job is not currently running: ' .. state)
+ error('Retry(): Job ' .. self.jid .. ' is not currently running: ' ..
+ state)
elseif oldworker ~= worker then
- error('Retry(): Job has been given to another worker: ' .. oldworker)
+ error('Retry(): Job ' .. self.jid ..
+ ' has been given to another worker: ' .. oldworker)
end
-- For each of these, decrement their retries. If any of them
-- have exhausted their retries, then we should mark them as
-- failed.
@@ -941,11 +945,11 @@
if remaining < 0 then
-- Now remove the instance from the schedule, and work queues for the
-- queue it's in
local group = group or 'failed-retries-' .. queue
self:history(now, 'failed', {['group'] = group})
-
+
redis.call('hmset', QlessJob.ns .. self.jid, 'state', 'failed',
'worker', '',
'expires', '')
-- If the failure has not already been set, then set it
if group ~= nil and message ~= nil then
@@ -965,11 +969,11 @@
'Job exhausted retries in queue "' .. oldqueue .. '"',
['when'] = now,
['worker'] = unpack(self:data('worker'))
}))
end
-
+
-- Add this type of failure to the list of failures
redis.call('sadd', 'ql:failures', group)
-- And add this particular instance to the failed types
redis.call('lpush', 'ql:f:' .. group, self.jid)
-- Increment the count of the failed jobs
@@ -1101,15 +1105,18 @@
-- worker
local job_worker, state = unpack(
redis.call('hmget', QlessJob.ns .. self.jid, 'worker', 'state'))
if job_worker == false then
-- This means the job doesn't exist
- error('Heartbeat(): Job does not exist')
+ error('Heartbeat(): Job ' .. self.jid .. ' does not exist')
elseif state ~= 'running' then
- error('Heartbeat(): Job not currently running: ' .. state)
+ error(
+ 'Heartbeat(): Job ' .. self.jid .. ' not currently running: ' .. state)
elseif job_worker ~= worker or #job_worker == 0 then
- error('Heartbeat(): Job given out to another worker: ' .. job_worker)
+ error(
+ 'Heartbeat(): Job ' .. self.jid ..
+ ' given out to another worker: ' .. job_worker)
else
-- Otherwise, optionally update the user data, and the heartbeat
if data then
-- I don't know if this is wise, but I'm decoding and encoding
-- the user data to hopefully ensure its sanity
@@ -1117,15 +1124,15 @@
expires, 'worker', worker, 'data', cjson.encode(data))
else
redis.call('hmset', QlessJob.ns .. self.jid,
'expires', expires, 'worker', worker)
end
-
+
-- Update hwen this job was last updated on that worker
-- Add this job to the list of jobs handled by this worker
redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, self.jid)
-
+
-- And now we should just update the locks
local queue = Qless.queue(
redis.call('hget', QlessJob.ns .. self.jid, 'queue'))
queue.locks.add(expires, self.jid)
return expires
@@ -1142,11 +1149,11 @@
tostring(priority))
-- Get the queue the job is currently in, if any
local queue = redis.call('hget', QlessJob.ns .. self.jid, 'queue')
- if queue == nil then
+ if queue == nil or queue == false then
-- If the job doesn't exist, throw an error
error('Priority(): Job ' .. self.jid .. ' does not exist')
elseif queue == '' then
-- Just adjust the priority
redis.call('hset', QlessJob.ns .. self.jid, 'priority', priority)
@@ -1175,20 +1182,20 @@
-- Times out the job now rather than when its lock is normally set to expire
function QlessJob:timeout(now)
local queue_name, state, worker = unpack(redis.call('hmget',
QlessJob.ns .. self.jid, 'queue', 'state', 'worker'))
- if queue_name == nil then
- error('Timeout(): Job does not exist')
+ if queue_name == nil or queue_name == false then
+ error('Timeout(): Job ' .. self.jid .. ' does not exist')
elseif state ~= 'running' then
error('Timeout(): Job ' .. self.jid .. ' not running')
else
-- Time out the job
self:history(now, 'timed-out')
local queue = Qless.queue(queue_name)
queue.locks.remove(self.jid)
- queue.work.add(now, math.huge, self.jid)
+ queue.work.add(now, '+inf', self.jid)
redis.call('hmset', QlessJob.ns .. self.jid,
'state', 'stalled', 'expires', 0)
local encoded = cjson.encode({
jid = self.jid,
event = 'lock_lost',
@@ -1259,11 +1266,11 @@
local count = tonumber(Qless.config.get('max-job-history', 100))
if count > 0 then
-- We'll always keep the first item around
local obj = redis.call('lpop', QlessJob.ns .. self.jid .. '-history')
redis.call('ltrim', QlessJob.ns .. self.jid .. '-history', -count + 2, -1)
- if obj ~= nil then
+ if obj ~= nil and obj ~= false then
redis.call('lpush', QlessJob.ns .. self.jid .. '-history', obj)
end
end
return redis.call('rpush', QlessJob.ns .. self.jid .. '-history',
cjson.encode({math.floor(now), what, item}))
@@ -1294,12 +1301,15 @@
end, remove = function(...)
if #arg > 0 then
return redis.call('zrem', queue:prefix('work'), unpack(arg))
end
end, add = function(now, priority, jid)
+ if priority ~= '+inf' then
+ priority = priority - (now / 10000000000)
+ end
return redis.call('zadd',
- queue:prefix('work'), priority - (now / 10000000000), jid)
+ queue:prefix('work'), priority, jid)
end, score = function(jid)
return redis.call('zscore', queue:prefix('work'), jid)
end, length = function()
return redis.call('zcard', queue:prefix('work'))
end
@@ -1307,22 +1317,22 @@
-- Access to our locks
queue.locks = {
expired = function(now, offset, count)
return redis.call('zrangebyscore',
- queue:prefix('locks'), -math.huge, now, 'LIMIT', offset, count)
+ queue:prefix('locks'), '-inf', now, 'LIMIT', offset, count)
end, peek = function(now, offset, count)
return redis.call('zrangebyscore', queue:prefix('locks'),
- now, math.huge, 'LIMIT', offset, count)
+ now, '+inf', 'LIMIT', offset, count)
end, add = function(expires, jid)
redis.call('zadd', queue:prefix('locks'), expires, jid)
end, remove = function(...)
if #arg > 0 then
return redis.call('zrem', queue:prefix('locks'), unpack(arg))
end
end, running = function(now)
- return redis.call('zcount', queue:prefix('locks'), now, math.huge)
+ return redis.call('zcount', queue:prefix('locks'), now, '+inf')
end, length = function(now)
-- If a 'now' is provided, we're interested in how many are before
-- that time
if now then
return redis.call('zcount', queue:prefix('locks'), 0, now)
@@ -1451,15 +1461,15 @@
-- The results we'll be sending back
local results = {}
local key = 'ql:s:' .. name .. ':' .. bin .. ':' .. queue
local count, mean, vk = unpack(redis.call('hmget', key, 'total', 'mean', 'vk'))
-
+
count = tonumber(count) or 0
mean = tonumber(mean) or 0
vk = tonumber(vk)
-
+
results.count = count or 0
results.mean = mean or 0
results.histogram = {}
if not count then
@@ -1505,12 +1515,12 @@
-- look for all the recurring jobs that need jobs run
self:check_recurring(now, count - #jids)
-- Now we've checked __all__ the locks for this queue the could
-- have expired, and are no more than the number requested. If
- -- we still need values in order to meet the demand, then we
- -- should check if any scheduled items, and if so, we should
+ -- we still need values in order to meet the demand, then we
+ -- should check if any scheduled items, and if so, we should
-- insert them to ensure correctness when pulling off the next
-- unit of work.
self:check_scheduled(now, count - #jids)
-- With these in place, we can expand this list of jids based on the work
@@ -1580,12 +1590,12 @@
-- If we still need jobs in order to meet demand, then we should
-- look for all the recurring jobs that need jobs run
self:check_recurring(now, count - #jids)
- -- If we still need values in order to meet the demand, then we
- -- should check if any scheduled items, and if so, we should
+ -- If we still need values in order to meet the demand, then we
+ -- should check if any scheduled items, and if so, we should
-- insert them to ensure correctness when pulling off the next
-- unit of work.
self:check_scheduled(now, count - #jids)
-- With these in place, we can expand this list of jids based on the work
@@ -1603,23 +1613,23 @@
redis.call('hget', QlessJob.ns .. jid, 'time') or now)
local waiting = now - time
self:stat(now, 'wait', waiting)
redis.call('hset', QlessJob.ns .. jid,
'time', string.format("%.20f", now))
-
+
-- Add this job to the list of jobs handled by this worker
redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, jid)
-
+
-- Update the jobs data, and add its locks, and return the job
job:update({
worker = worker,
expires = expires,
state = 'running'
})
-
+
self.locks.add(expires, jid)
-
+
local tracked = redis.call('zscore', 'ql:tracked', jid) ~= false
if tracked then
Qless.publish('popped', jid)
end
end
@@ -1666,11 +1676,11 @@
redis.call('hincrby', key, 'm' .. math.floor(val / 60), 1)
elseif val < 86400 then -- hours
redis.call('hincrby', key, 'h' .. math.floor(val / 3600), 1)
else -- days
redis.call('hincrby', key, 'd' .. math.floor(val / 86400), 1)
- end
+ end
redis.call('hmset', key, 'total', count, 'mean', mean, 'vk', vk)
end
-- Put(now, jid, klass, data, delay,
-- [priority, p],
@@ -1726,12 +1736,12 @@
for _, d in ipairs(depends) do new[d] = 1 end
-- Now find what's in the original, but not the new
local original = redis.call(
'smembers', QlessJob.ns .. jid .. '-dependencies')
- for _, dep in pairs(original) do
- if new[dep] == nil then
+ for _, dep in pairs(original) do
+ if new[dep] == nil or new[dep] == false then
-- Remove k as a dependency
redis.call('srem', QlessJob.ns .. dep .. '-dependents' , jid)
redis.call('srem', QlessJob.ns .. jid .. '-dependencies', dep)
end
end
@@ -1849,11 +1859,11 @@
self.work.add(now, priority, jid)
end
end
-- Lastly, we're going to make sure that this item is in the
- -- set of known queues. We should keep this sorted by the
+ -- set of known queues. We should keep this sorted by the
-- order in which we saw each of these queues
if redis.call('zscore', 'ql:queues', self.name) == false then
redis.call('zadd', 'ql:queues', now, self.name)
end
@@ -1919,11 +1929,11 @@
-- Read in all the optional parameters. All of these must come in
-- pairs, so if we have an odd number of extra args, raise an error
if #arg % 2 == 1 then
error('Odd number of additional args: ' .. tostring(arg))
end
-
+
-- Read in all the optional parameters
local options = {}
for i = 3, #arg, 2 do options[arg[i]] = arg[i + 1] end
options.tags = assert(cjson.decode(options.tags or '{}'),
'Recur(): Arg "tags" must be JSON string array: ' .. tostring(
@@ -1939,16 +1949,16 @@
options.backlog))
local count, old_queue = unpack(redis.call('hmget', 'ql:r:' .. jid, 'count', 'queue'))
count = count or 0
- -- If it has previously been in another queue, then we should remove
+ -- If it has previously been in another queue, then we should remove
-- some information about it
if old_queue then
Qless.queue(old_queue).recurring.remove(jid)
end
-
+
-- Do some insertions
redis.call('hmset', 'ql:r:' .. jid,
'jid' , jid,
'klass' , klass,
'data' , raw_data,
@@ -1962,18 +1972,18 @@
'interval', interval,
'retries' , options.retries,
'backlog' , options.backlog)
-- Now, we should schedule the next run of the job
self.recurring.add(now + offset, jid)
-
+
-- Lastly, we're going to make sure that this item is in the
- -- set of known queues. We should keep this sorted by the
+ -- set of known queues. We should keep this sorted by the
-- order in which we saw each of these queues
if redis.call('zscore', 'ql:queues', self.name) == false then
redis.call('zadd', 'ql:queues', now, self.name)
end
-
+
return jid
else
error('Recur(): schedule type "' .. tostring(spec) .. '" unknown')
end
end
@@ -2015,26 +2025,26 @@
score = score + (
math.ceil(num - backlog) * interval
)
end
end
-
- -- We're saving this value so that in the history, we can accurately
+
+ -- We're saving this value so that in the history, we can accurately
-- reflect when the job would normally have been scheduled
while (score <= now) and (moved < count) do
local count = redis.call('hincrby', 'ql:r:' .. jid, 'count', 1)
moved = moved + 1
local child_jid = jid .. '-' .. count
-
+
-- Add this job to the list of jobs tagged with whatever tags were
-- supplied
for i, tag in ipairs(_tags) do
redis.call('zadd', 'ql:t:' .. tag, now, child_jid)
redis.call('zincrby', 'ql:tags', 1, tag)
end
-
+
-- First, let's save its data
redis.call('hmset', QlessJob.ns .. child_jid,
'jid' , child_jid,
'klass' , klass,
'data' , data,
@@ -2047,16 +2057,16 @@
'retries' , retries,
'remaining' , retries,
'time' , string.format("%.20f", score),
'spawned_from_jid', jid)
Qless.job(child_jid):history(score, 'put', {q = self.name})
-
+
-- Now, if a delay was provided, and if it's in the future,
-- then we'll have to schedule it. Otherwise, we're just
-- going to add it to the work queue.
self.work.add(score, priority, child_jid)
-
+
score = score + interval
self.recurring.add(score, jid)
end
end
end
@@ -2067,11 +2077,11 @@
function QlessQueue:check_scheduled(now, count)
-- zadd is a list of arguments that we'll be able to use to
-- insert into the work queue
local scheduled = self.scheduled.ready(now, 0, count)
for index, jid in ipairs(scheduled) do
- -- With these in hand, we'll have to go out and find the
+ -- With these in hand, we'll have to go out and find the
-- priorities of these jobs, and then we'll insert them
-- into the work queue and then when that's complete, we'll
-- remove them from the scheduled queue
local priority = tonumber(
redis.call('hget', QlessJob.ns .. jid, 'priority') or 0)
@@ -2152,19 +2162,19 @@
redis.call('hdel', QlessJob.ns .. jid, 'grace', 0)
-- See how many remaining retries the job has
local remaining = tonumber(redis.call(
'hincrby', QlessJob.ns .. jid, 'remaining', -1))
-
+
-- This is where we actually have to time out the work
if remaining < 0 then
-- Now remove the instance from the schedule, and work queues
-- for the queue it's in
self.work.remove(jid)
self.locks.remove(jid)
self.scheduled.remove(jid)
-
+
local group = 'failed-retries-' .. Qless.job(jid):data()['queue']
local job = Qless.job(jid)
job:history(now, 'failed', {group = group})
redis.call('hmset', QlessJob.ns .. jid, 'state', 'failed',
'worker', '',
@@ -2176,16 +2186,16 @@
['message'] =
'Job exhausted retries in queue "' .. self.name .. '"',
['when'] = now,
['worker'] = unpack(job:data('worker'))
}))
-
+
-- Add this type of failure to the list of failures
redis.call('sadd', 'ql:failures', group)
-- And add this particular instance to the failed types
redis.call('lpush', 'ql:f:' .. group, jid)
-
+
if redis.call('zscore', 'ql:tracked', jid) ~= false then
Qless.publish('failed', jid)
end
Qless.publish('log', cjson.encode({
jid = jid,
@@ -2258,15 +2268,15 @@
-- Get all the attributes of this particular job
function QlessRecurringJob:data()
local job = redis.call(
'hmget', 'ql:r:' .. self.jid, 'jid', 'klass', 'state', 'queue',
'priority', 'interval', 'retries', 'count', 'data', 'tags', 'backlog')
-
+
if not job[1] then
return nil
end
-
+
return {
jid = job[1],
klass = job[2],
state = job[3],
queue = job[4],
@@ -2285,11 +2295,11 @@
-- - interval
-- - retries
-- - data
-- - klass
-- - queue
--- - backlog
+-- - backlog
function QlessRecurringJob:update(now, ...)
local options = {}
-- Make sure that the job exists
if redis.call('exists', 'ql:r:' .. self.jid) ~= 0 then
for i = 1, #arg, 2 do
@@ -2343,14 +2353,14 @@
if tags then
-- Decode the json blob, convert to dictionary
tags = cjson.decode(tags)
local _tags = {}
for i,v in ipairs(tags) do _tags[v] = true end
-
+
-- Otherwise, add the job to the sorted set with that tags
- for i=1,#arg do if _tags[arg[i]] == nil then table.insert(tags, arg[i]) end end
-
+ for i=1,#arg do if _tags[arg[i]] == nil or _tags[arg[i]] == false then table.insert(tags, arg[i]) end end
+
tags = cjson.encode(tags)
redis.call('hset', 'ql:r:' .. self.jid, 'tags', tags)
return tags
else
error('Tag(): Job ' .. self.jid .. ' does not exist')
@@ -2402,23 +2412,23 @@
end
-- Provide data about all the workers, or if a specific worker is provided,
-- then which jobs that worker is responsible for. If no worker is provided,
-- expect a response of the form:
---
+--
-- [
-- # This is sorted by the recency of activity from that worker
-- {
-- 'name' : 'hostname1-pid1',
-- 'jobs' : 20,
-- 'stalled': 0
-- }, {
-- ...
-- }
-- ]
---
+--
-- If a worker id is provided, then expect a response of the form:
---
+--
-- {
-- 'jobs': [
-- jid1,
-- jid2,
-- ...