lib/qless/queue.rb in qless-0.9.3 vs lib/qless/queue.rb in qless-0.10.0

- old
+ new

@@ -1,53 +1,62 @@ -require "qless/job" -require "redis" -require "json" +# Encoding: utf-8 +require 'qless/job' +require 'redis' +require 'json' + module Qless + # A class for interacting with jobs in different states in a queue. Not meant + # to be instantiated directly, it's accessed with Queue#jobs class QueueJobs def initialize(name, client) @name = name @client = client end - def running(start=0, count=25) - @client._jobs.call([], ['running', Time.now.to_f, @name, start, count]) + def running(start = 0, count = 25) + @client.call('jobs', 'running', @name, start, count) end - def stalled(start=0, count=25) - @client._jobs.call([], ['stalled', Time.now.to_f, @name, start, count]) + def stalled(start = 0, count = 25) + @client.call('jobs', 'stalled', @name, start, count) end - def scheduled(start=0, count=25) - @client._jobs.call([], ['scheduled', Time.now.to_f, @name, start, count]) + def scheduled(start = 0, count = 25) + @client.call('jobs', 'scheduled', @name, start, count) end - def depends(start=0, count=25) - @client._jobs.call([], ['depends', Time.now.to_f, @name, start, count]) + def depends(start = 0, count = 25) + @client.call('jobs', 'depends', @name, start, count) end - def recurring(start=0, count=25) - @client._jobs.call([], ['recurring', Time.now.to_f, @name, start, count]) + def recurring(start = 0, count = 25) + @client.call('jobs', 'recurring', @name, start, count) end end + # A class for interacting with a specific queue. Not meant to be instantiated + # directly, it's accessed with Client#queues[...] class Queue attr_reader :name, :client - attr_accessor :worker_name def initialize(name, client) @client = client @name = name - self.worker_name = Qless.worker_name end + # Our worker name is the same as our client's + def worker_name + @client.worker_name + end + def jobs @jobs ||= QueueJobs.new(@name, @client) end def counts - JSON.parse(@client._queues.call([], [Time.now.to_i, @name])) + JSON.parse(@client.call('queues', @name)) end def heartbeat get_config :heartbeat end @@ -55,98 +64,124 @@ def heartbeat=(value) set_config :heartbeat, value end def max_concurrency - value = get_config(:"max-concurrency") + value = get_config('max-concurrency') value && Integer(value) end def max_concurrency=(value) - set_config :"max-concurrency", value + set_config 'max-concurrency', value end - def pause - @client._pause.call([], [name]) + def paused? + counts['paused'] end + def pause(opts = {}) + @client.call('pause', name) + @client.call('timeout', jobs.running(0, -1)) unless opts[:stopjobs].nil? + end + def unpause - @client._unpause.call([], [name]) + @client.call('unpause', name) end + QueueNotEmptyError = Class.new(StandardError) + + def forget + job_count = length + if job_count.zero? + @client.call('queue.forget', name) + else + raise QueueNotEmptyError, "The queue is not empty. It has #{job_count} jobs." + end + end + # Put the described job in this queue # Options include: # => priority (int) # => tags (array of strings) # => delay (int) - def put(klass, data, opts={}) + def put(klass, data, opts = {}) opts = job_options(klass, data, opts) - - @client._put.call([@name], [ - (opts[:jid] or Qless.generate_jid), - klass.name, - JSON.generate(data), - Time.now.to_f, - opts.fetch(:delay, 0), - 'priority', opts.fetch(:priority, 0), - 'tags', JSON.generate(opts.fetch(:tags, [])), - 'retries', opts.fetch(:retries, 5), - 'depends', JSON.generate(opts.fetch(:depends, [])) - ]) + @client.call('put', worker_name, @name, + (opts[:jid] || Qless.generate_jid), + klass.is_a?(String) ? klass : klass.name, + JSON.generate(data), + opts.fetch(:delay, 0), + 'priority', opts.fetch(:priority, 0), + 'tags', JSON.generate(opts.fetch(:tags, [])), + 'retries', opts.fetch(:retries, 5), + 'depends', JSON.generate(opts.fetch(:depends, [])) + ) end # Make a recurring job in this queue # Options include: # => priority (int) # => tags (array of strings) # => retries (int) # => offset (int) - def recur(klass, data, interval, opts={}) + def recur(klass, data, interval, opts = {}) opts = job_options(klass, data, opts) - - @client._recur.call([], [ - 'on', + @client.call( + 'recur', @name, - (opts[:jid] or Qless.generate_jid), - klass.to_s, + (opts[:jid] || Qless.generate_jid), + klass.is_a?(String) ? klass : klass.name, JSON.generate(data), - Time.now.to_f, 'interval', interval, opts.fetch(:offset, 0), 'priority', opts.fetch(:priority, 0), 'tags', JSON.generate(opts.fetch(:tags, [])), - 'retries', opts.fetch(:retries, 5) - ]) + 'retries', opts.fetch(:retries, 5), + 'backlog', opts.fetch(:backlog, 0) + ) end # Pop a work item off the queue - def pop(count=nil) - results = @client._pop.call([@name], [worker_name, (count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) } - count.nil? ? results[0] : results + def pop(count = nil) + jids = JSON.parse(@client.call('pop', @name, worker_name, (count || 1))) + jobs = jids.map { |j| Job.new(@client, j) } + count.nil? ? jobs[0] : jobs end # Peek at a work item - def peek(count=nil) - results = @client._peek.call([@name], [(count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) } - count.nil? ? results[0] : results + def peek(count = nil) + jids = JSON.parse(@client.call('peek', @name, (count || 1))) + jobs = jids.map { |j| Job.new(@client, j) } + count.nil? ? jobs[0] : jobs end - def stats(date=nil) - JSON.parse(@client._stats.call([], [@name, (date || Time.now.to_f)])) + def stats(date = nil) + JSON.parse(@client.call('stats', @name, (date || Time.now.to_f))) end # How many items in the queue? def length (@client.redis.multi do - @client.redis.zcard("ql:q:#{@name}-locks") - @client.redis.zcard("ql:q:#{@name}-work") - @client.redis.zcard("ql:q:#{@name}-scheduled") + %w[ locks work scheduled depends ].each do |suffix| + @client.redis.zcard("ql:q:#{@name}-#{suffix}") + end end).inject(0, :+) end def to_s "#<Qless::Queue #{@name}>" end - alias inspect to_s + alias_method :inspect, :to_s + + def ==(other) + self.class == other.class && + client == other.client && + name.to_s == other.name.to_s + end + alias eql? == + + def hash + self.class.hash ^ client.hash ^ name.to_s.hash + end private def job_options(klass, data, opts) return opts unless klass.respond_to?(:default_job_options)