require 'redis' begin require 'yajl' rescue require 'json' end $LOAD_PATH << './lib' require 'kthxbye/config' require 'kthxbye/helper' require 'kthxbye/job' require 'kthxbye/worker' require 'kthxbye/failure' require 'kthxbye/version' require 'kthxbye/exceptions' module Kthxbye include Helper extend self #takes in an existing redis instance or simply connects a new instance def connect( redis_instance=nil ) @redis = ( redis_instance || Redis.new( :host => Config.options[:redis_server], :port => Config.options[:redis_port] ) ) end def redis return @redis if @redis Config.setup self.connect self.redis end def keys redis.keys("*") end # def enqueue(queue, klass, *args) Job.create(queue, klass, *args) end # gets the size of a given queue def size(queue) redis.llen("queue:#{queue}").to_i end # gets the latest latest job off the given queue # returns a Job object def salvage(q) id = redis.lpop( "queue:#{q}" ) if id payload = decode( redis.hget( "data-store:#{q}", id ) ) return Job.new(id, q, payload) else log "No jobs found in #{q}" return nil end end # lets us peek at the data to be run with a job # can lookup an entire queue or for a specific job id def peek(store, queue, id=nil) if id decode( redis.hget( "#{store}-store:#{queue}", id ) ) else all = redis.hgetall( "#{store}-store:#{queue}" ) results = {} all.each {|k,v| results[k] = decode( v ) } return results end end # handles a few of our dynamic methods def method_missing(name, *args) method_name = name.id2name if method_name =~ /^(data|result)_peek$/ Kthxbye.send(:peek, $1, *args) else super end end # returns all the queues Kthxbye knows about def queues redis.smembers( :queues ).sort end # registers the queue in our "known queues" list def register_queue(queue) redis.sadd(:queues, queue) unless redis.sismember(:queues, queue) end # Removes the queue from the active queue listing, does not delete queue # will lead to phantom queues. use delete_queue for complete removal of queue def unregister_queue(queue) redis.srem(:queues, queue) end # Completely removes queue: unregisters it then deletes it # should return true in all cases def delete_queue(queue) unregister_queue(queue) redis.del( "queue:#{queue}" ) || true end # returns all our registered workers def workers workers = redis.smembers( :workers ) workers.map {|x| Worker.find( x ) } end # returns all our active workers and the job they are working def working workers = redis.smembers( :working ) data = [] workers.each do |w_id| data << [w_id, decode( redis.get("worker:#{w_id}") )] end return data end # returns either the job results for a specific job (if id specified) # or all the results for all the jobs on a queue def job_results(queue, id=nil) if id decode( redis.hget( "result-store:#{queue}", id ) ) else Array( redis.hgetall( "result-store:#{queue}" ) ) end end def inspect { :version => Version, :keys => keys.size, :workers => workers.size, :working => working.size, :queues => queues.size, :failed => Failure.count, :pending => queues.inject(0) {|m,o| m + size(o)} } end end