lib/qless/queue.rb in qless-0.9.3 vs lib/qless/queue.rb in qless-0.10.0
- old
+ new
@@ -1,53 +1,62 @@
-require "qless/job"
-require "redis"
-require "json"
+# Encoding: utf-8
+require 'qless/job'
+require 'redis'
+require 'json'
+
module Qless
+ # A class for interacting with jobs in different states in a queue. Not meant
+ # to be instantiated directly, it's accessed with Queue#jobs
class QueueJobs
def initialize(name, client)
@name = name
@client = client
end
- def running(start=0, count=25)
- @client._jobs.call([], ['running', Time.now.to_f, @name, start, count])
+ def running(start = 0, count = 25)
+ @client.call('jobs', 'running', @name, start, count)
end
- def stalled(start=0, count=25)
- @client._jobs.call([], ['stalled', Time.now.to_f, @name, start, count])
+ def stalled(start = 0, count = 25)
+ @client.call('jobs', 'stalled', @name, start, count)
end
- def scheduled(start=0, count=25)
- @client._jobs.call([], ['scheduled', Time.now.to_f, @name, start, count])
+ def scheduled(start = 0, count = 25)
+ @client.call('jobs', 'scheduled', @name, start, count)
end
- def depends(start=0, count=25)
- @client._jobs.call([], ['depends', Time.now.to_f, @name, start, count])
+ def depends(start = 0, count = 25)
+ @client.call('jobs', 'depends', @name, start, count)
end
- def recurring(start=0, count=25)
- @client._jobs.call([], ['recurring', Time.now.to_f, @name, start, count])
+ def recurring(start = 0, count = 25)
+ @client.call('jobs', 'recurring', @name, start, count)
end
end
+ # A class for interacting with a specific queue. Not meant to be instantiated
+ # directly, it's accessed with Client#queues[...]
class Queue
attr_reader :name, :client
- attr_accessor :worker_name
def initialize(name, client)
@client = client
@name = name
- self.worker_name = Qless.worker_name
end
+ # Our worker name is the same as our client's
+ def worker_name
+ @client.worker_name
+ end
+
def jobs
@jobs ||= QueueJobs.new(@name, @client)
end
def counts
- JSON.parse(@client._queues.call([], [Time.now.to_i, @name]))
+ JSON.parse(@client.call('queues', @name))
end
def heartbeat
get_config :heartbeat
end
@@ -55,98 +64,124 @@
def heartbeat=(value)
set_config :heartbeat, value
end
def max_concurrency
- value = get_config(:"max-concurrency")
+ value = get_config('max-concurrency')
value && Integer(value)
end
def max_concurrency=(value)
- set_config :"max-concurrency", value
+ set_config 'max-concurrency', value
end
- def pause
- @client._pause.call([], [name])
+ def paused?
+ counts['paused']
end
+ def pause(opts = {})
+ @client.call('pause', name)
+ @client.call('timeout', jobs.running(0, -1)) unless opts[:stopjobs].nil?
+ end
+
def unpause
- @client._unpause.call([], [name])
+ @client.call('unpause', name)
end
+ QueueNotEmptyError = Class.new(StandardError)
+
+ def forget
+ job_count = length
+ if job_count.zero?
+ @client.call('queue.forget', name)
+ else
+ raise QueueNotEmptyError, "The queue is not empty. It has #{job_count} jobs."
+ end
+ end
+
# Put the described job in this queue
# Options include:
# => priority (int)
# => tags (array of strings)
# => delay (int)
- def put(klass, data, opts={})
+ def put(klass, data, opts = {})
opts = job_options(klass, data, opts)
-
- @client._put.call([@name], [
- (opts[:jid] or Qless.generate_jid),
- klass.name,
- JSON.generate(data),
- Time.now.to_f,
- opts.fetch(:delay, 0),
- 'priority', opts.fetch(:priority, 0),
- 'tags', JSON.generate(opts.fetch(:tags, [])),
- 'retries', opts.fetch(:retries, 5),
- 'depends', JSON.generate(opts.fetch(:depends, []))
- ])
+ @client.call('put', worker_name, @name,
+ (opts[:jid] || Qless.generate_jid),
+ klass.is_a?(String) ? klass : klass.name,
+ JSON.generate(data),
+ opts.fetch(:delay, 0),
+ 'priority', opts.fetch(:priority, 0),
+ 'tags', JSON.generate(opts.fetch(:tags, [])),
+ 'retries', opts.fetch(:retries, 5),
+ 'depends', JSON.generate(opts.fetch(:depends, []))
+ )
end
# Make a recurring job in this queue
# Options include:
# => priority (int)
# => tags (array of strings)
# => retries (int)
# => offset (int)
- def recur(klass, data, interval, opts={})
+ def recur(klass, data, interval, opts = {})
opts = job_options(klass, data, opts)
-
- @client._recur.call([], [
- 'on',
+ @client.call(
+ 'recur',
@name,
- (opts[:jid] or Qless.generate_jid),
- klass.to_s,
+ (opts[:jid] || Qless.generate_jid),
+ klass.is_a?(String) ? klass : klass.name,
JSON.generate(data),
- Time.now.to_f,
'interval', interval, opts.fetch(:offset, 0),
'priority', opts.fetch(:priority, 0),
'tags', JSON.generate(opts.fetch(:tags, [])),
- 'retries', opts.fetch(:retries, 5)
- ])
+ 'retries', opts.fetch(:retries, 5),
+ 'backlog', opts.fetch(:backlog, 0)
+ )
end
# Pop a work item off the queue
- def pop(count=nil)
- results = @client._pop.call([@name], [worker_name, (count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) }
- count.nil? ? results[0] : results
+ def pop(count = nil)
+ jids = JSON.parse(@client.call('pop', @name, worker_name, (count || 1)))
+ jobs = jids.map { |j| Job.new(@client, j) }
+ count.nil? ? jobs[0] : jobs
end
# Peek at a work item
- def peek(count=nil)
- results = @client._peek.call([@name], [(count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) }
- count.nil? ? results[0] : results
+ def peek(count = nil)
+ jids = JSON.parse(@client.call('peek', @name, (count || 1)))
+ jobs = jids.map { |j| Job.new(@client, j) }
+ count.nil? ? jobs[0] : jobs
end
- def stats(date=nil)
- JSON.parse(@client._stats.call([], [@name, (date || Time.now.to_f)]))
+ def stats(date = nil)
+ JSON.parse(@client.call('stats', @name, (date || Time.now.to_f)))
end
# How many items in the queue?
def length
(@client.redis.multi do
- @client.redis.zcard("ql:q:#{@name}-locks")
- @client.redis.zcard("ql:q:#{@name}-work")
- @client.redis.zcard("ql:q:#{@name}-scheduled")
+ %w[ locks work scheduled depends ].each do |suffix|
+ @client.redis.zcard("ql:q:#{@name}-#{suffix}")
+ end
end).inject(0, :+)
end
def to_s
"#<Qless::Queue #{@name}>"
end
- alias inspect to_s
+ alias_method :inspect, :to_s
+
+ def ==(other)
+ self.class == other.class &&
+ client == other.client &&
+ name.to_s == other.name.to_s
+ end
+ alias eql? ==
+
+ def hash
+ self.class.hash ^ client.hash ^ name.to_s.hash
+ end
private
def job_options(klass, data, opts)
return opts unless klass.respond_to?(:default_job_options)