lib/qless/queue.rb in qless-0.9.2 vs lib/qless/queue.rb in qless-0.9.3
- old
+ new
@@ -1,64 +1,80 @@
-require "qless/lua"
require "qless/job"
require "redis"
require "json"
module Qless
class QueueJobs
def initialize(name, client)
@name = name
@client = client
end
-
+
def running(start=0, count=25)
@client._jobs.call([], ['running', Time.now.to_f, @name, start, count])
end
-
+
def stalled(start=0, count=25)
@client._jobs.call([], ['stalled', Time.now.to_f, @name, start, count])
end
-
+
def scheduled(start=0, count=25)
@client._jobs.call([], ['scheduled', Time.now.to_f, @name, start, count])
end
-
+
def depends(start=0, count=25)
@client._jobs.call([], ['depends', Time.now.to_f, @name, start, count])
end
-
+
def recurring(start=0, count=25)
@client._jobs.call([], ['recurring', Time.now.to_f, @name, start, count])
end
end
-
+
class Queue
- attr_reader :name
+ attr_reader :name, :client
attr_accessor :worker_name
-
+
def initialize(name, client)
@client = client
@name = name
self.worker_name = Qless.worker_name
end
-
+
def jobs
@jobs ||= QueueJobs.new(@name, @client)
end
-
+
def counts
JSON.parse(@client._queues.call([], [Time.now.to_i, @name]))
end
-
+
def heartbeat
- @client.config["#{@name}-heartbeat"]
+ get_config :heartbeat
end
-
+
def heartbeat=(value)
- @client.config["#{@name}-heartbeat"] = value
+ set_config :heartbeat, value
end
-
+
+ def max_concurrency
+ value = get_config(:"max-concurrency")
+ value && Integer(value)
+ end
+
+ def max_concurrency=(value)
+ set_config :"max-concurrency", value
+ end
+
+ def pause
+ @client._pause.call([], [name])
+ end
+
+ def unpause
+ @client._unpause.call([], [name])
+ end
+
# Put the described job in this queue
# Options include:
# => priority (int)
# => tags (array of strings)
# => delay (int)
@@ -75,11 +91,11 @@
'tags', JSON.generate(opts.fetch(:tags, [])),
'retries', opts.fetch(:retries, 5),
'depends', JSON.generate(opts.fetch(:depends, []))
])
end
-
+
# Make a recurring job in this queue
# Options include:
# => priority (int)
# => tags (array of strings)
# => retries (int)
@@ -98,27 +114,27 @@
'priority', opts.fetch(:priority, 0),
'tags', JSON.generate(opts.fetch(:tags, [])),
'retries', opts.fetch(:retries, 5)
])
end
-
+
# Pop a work item off the queue
def pop(count=nil)
results = @client._pop.call([@name], [worker_name, (count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) }
count.nil? ? results[0] : results
end
-
+
# Peek at a work item
def peek(count=nil)
results = @client._peek.call([@name], [(count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) }
count.nil? ? results[0] : results
end
-
+
def stats(date=nil)
JSON.parse(@client._stats.call([], [@name, (date || Time.now.to_f)]))
end
-
+
# How many items in the queue?
def length
(@client.redis.multi do
@client.redis.zcard("ql:q:#{@name}-locks")
@client.redis.zcard("ql:q:#{@name}-work")
@@ -134,8 +150,16 @@
private
def job_options(klass, data, opts)
return opts unless klass.respond_to?(:default_job_options)
klass.default_job_options(data).merge(opts)
+ end
+
+ def set_config(config, value)
+ @client.config["#{@name}-#{config}"] = value
+ end
+
+ def get_config(config)
+ @client.config["#{@name}-#{config}"]
end
end
end