lib/qless.rb in qless-0.9.2 vs lib/qless.rb in qless-0.9.3
- old
+ new
@@ -1,19 +1,32 @@
require "socket"
require "redis"
require "json"
require "securerandom"
+module Qless
+ # Define our error base class before requiring the other
+ # files so they can define subclasses.
+ Error = Class.new(StandardError)
+
+ # to maintain backwards compatibility with v2.x of that gem we need this constant because:
+ # * (lua.rb) the #evalsha method signature changed between v2.x and v3.x of the redis ruby gem
+ # * (worker.rb) in v3.x you have to reconnect to the redis server after forking the process
+ USING_LEGACY_REDIS_VERSION = ::Redis::VERSION.to_f < 3.0
+end
+
require "qless/version"
require "qless/config"
require "qless/queue"
require "qless/job"
-require "qless/lua"
+require "qless/lua_script"
module Qless
extend self
+ UnsupportedRedisVersionError = Class.new(Error)
+
def generate_jid
SecureRandom.uuid.gsub('-', '')
end
def stringify_hash_keys(hash)
@@ -25,41 +38,39 @@
# This is a unique identifier for the worker
def worker_name
@worker_name ||= [Socket.gethostname, Process.pid.to_s].join('-')
end
- class UnsupportedRedisVersionError < StandardError; end
-
class ClientJobs
def initialize(client)
@client = client
end
-
+
def complete(offset=0, count=25)
@client._jobs.call([], ['complete', offset, count])
end
-
+
def tracked
results = JSON.parse(@client._track.call([], []))
results['jobs'] = results['jobs'].map { |j| Job.new(@client, j) }
results
end
-
+
def tagged(tag, offset=0, count=25)
JSON.parse(@client._tag.call([], ['get', tag, offset, count]))
end
-
+
def failed(t=nil, start=0, limit=25)
if not t
JSON.parse(@client._failed.call([], []))
else
results = JSON.parse(@client._failed.call([], [t, start, limit]))
results['jobs'] = results['jobs'].map { |j| Job.new(@client, j) }
results
end
end
-
+
def [](id)
results = @client._get.call([], [id])
if results.nil?
results = @client._recur.call([], ['get', id])
if results.nil?
@@ -68,54 +79,54 @@
return RecurringJob.new(@client, JSON.parse(results))
end
Job.new(@client, JSON.parse(results))
end
end
-
+
class ClientWorkers
def initialize(client)
@client = client
end
-
+
def counts
JSON.parse(@client._workers.call([], [Time.now.to_i]))
end
-
+
def [](name)
JSON.parse(@client._workers.call([], [Time.now.to_i, name]))
end
end
-
+
class ClientQueues
def initialize(client)
@client = client
end
-
+
def counts
JSON.parse(@client._queues.call([], [Time.now.to_i]))
end
-
+
def [](name)
Queue.new(name, @client)
end
end
-
+
class ClientEvents
def initialize(redis)
@redis = redis
@actions = Hash.new()
end
-
+
def canceled(&block) ; @actions[:canceled ] = block; end
def completed(&block); @actions[:completed] = block; end
def failed(&block) ; @actions[:failed ] = block; end
def popped(&block) ; @actions[:popped ] = block; end
def stalled(&block) ; @actions[:stalled ] = block; end
def put(&block) ; @actions[:put ] = block; end
def track(&block) ; @actions[:track ] = block; end
def untrack(&block) ; @actions[:untrack ] = block; end
-
+
def listen
yield(self) if block_given?
@redis.subscribe(:canceled, :completed, :failed, :popped, :stalled, :put, :track, :untrack) do |on|
on.message do |channel, message|
callback = @actions[channel.to_sym]
@@ -123,64 +134,80 @@
callback.call(message)
end
end
end
end
-
+
def stop
@redis.unsubscribe
end
end
-
+
class Client
# Lua scripts
attr_reader :_cancel, :_config, :_complete, :_fail, :_failed, :_get, :_heartbeat, :_jobs, :_peek, :_pop
attr_reader :_priority, :_put, :_queues, :_recur, :_retry, :_stats, :_tag, :_track, :_workers, :_depends
+ attr_reader :_pause, :_unpause, :_deregister_workers
# A real object
attr_reader :config, :redis, :jobs, :queues, :workers
-
+
def initialize(options = {})
# This is the redis instance we're connected to
@redis = options[:redis] || Redis.connect(options) # use connect so REDIS_URL will be honored
@options = options
assert_minimum_redis_version("2.5.5")
@config = Config.new(self)
['cancel', 'config', 'complete', 'depends', 'fail', 'failed', 'get', 'heartbeat', 'jobs', 'peek', 'pop',
- 'priority', 'put', 'queues', 'recur', 'retry', 'stats', 'tag', 'track', 'workers'].each do |f|
- self.instance_variable_set("@_#{f}", Lua.new(f, @redis))
+ 'priority', 'put', 'queues', 'recur', 'retry', 'stats', 'tag', 'track', 'workers', 'pause', 'unpause',
+ 'deregister_workers'].each do |f|
+ self.instance_variable_set("@_#{f}", Qless::LuaScript.new(f, @redis))
end
-
+
@jobs = ClientJobs.new(self)
@queues = ClientQueues.new(self)
@workers = ClientWorkers.new(self)
end
def inspect
"<Qless::Client #{@options} >"
end
-
+
def events
# Events needs its own redis instance of the same configuration, because
# once it's subscribed, we can only use pub-sub-like commands. This way,
# we still have access to the client in the normal case
@events ||= ClientEvents.new(Redis.connect(@options))
end
-
+
def track(jid)
@_track.call([], ['track', jid, Time.now.to_i])
end
-
+
def untrack(jid)
@_track.call([], ['untrack', jid, Time.now.to_i])
end
-
+
def tags(offset=0, count=100)
JSON.parse(@_tag.call([], ['top', offset, count]))
end
+
+ def deregister_workers(*worker_names)
+ _deregister_workers.call([], worker_names)
+ end
+
+ def bulk_cancel(jids)
+ @_cancel.call([], jids)
+ end
+
+ def new_redis_connection
+ ::Redis.new(url: redis.id)
+ end
+
private
def assert_minimum_redis_version(version)
- redis_version = @redis.info.fetch("redis_version")
+ # remove the "-pre2" from "2.6.8-pre2"
+ redis_version = @redis.info.fetch("redis_version").split('-').first
return if Gem::Version.new(redis_version) >= Gem::Version.new(version)
raise UnsupportedRedisVersionError,
"You are running redis #{redis_version}, but qless requires at least #{version}"
end