require 'redis' begin require 'yajl' rescue require 'json' end require 'kthxbye/config' require 'kthxbye/helper' require 'kthxbye/job' require 'kthxbye/worker' require 'kthxbye/failure' require 'kthxbye/stats' require 'kthxbye/version' require 'kthxbye/exceptions' require 'kthxbye/railtie' if defined?(Rails) && Rails::VERSION::MAJOR == 3 # Kthxbye is a light-weight, distributed delayed-job processor. It is meant # to take the load of long running processes off of your web-server any place # it in the capable hands of a backend processor. It is unique in that it # provides results delievery post-processing so that your front-end can actually # have its results back. # # Most delayed job processors out there are meant for running processes that we # don't need to get a result from. For example mass-mailings or image processing. # However I've found many a case in which I need the result of a long process but # my web server is bogging down watiting for a response. # # And this is where Kthxbye shines. # # Okay, you say, enough glowing, let's see this thinger work. Simple enough. # # On your front-end (e.g. a Rails app) initialization process, run # Kthxbye::Config.setup(:redis_server => "localhost", :redis_port => 9876) # # Then somewhere accessable to both your app and your workers (maybe a separate # job class file in you /lib dir) create your job class with a class method # perform: # class MyJob # def self.perform(param1, param2) # puts "I'm here with data!" # # perform stuff with data # result = param1 + param2 # return result # end # end # # Then all that's left is to queue up the job at the appropriate time: # Kthxbye.enqueue("job-queue", MyJob, 6, 4) # # Your job is now queued for processing! To work this job, its even simpler. # require 'whatever/file/my_job/class/is/in' # worker = Kthxbye::Worker.new("job-queue") # worker.run # # Let the processing commence! module Kthxbye include Helper extend self # This is not necessary to call. Any of the methods that use redis # Will make an inital call to connect to redis. # Useful if you want to connect to an existing redis instance. Othewise, if # called without params, it simply connects a new instance of redis. def connect( redis_instance=nil ) @redis = ( redis_instance || Redis.new( :host => Config.options[:redis_server], :port => Config.options[:redis_port] ) ) end # Returns the Redis instance for direct calls to the Redis db def redis return @redis if @redis Config.setup self.connect self.redis end # Returns a hash of all existing Redis keys. def keys redis.keys("*") end # Queues jobs. Takes at minimum two paramters # 1) A string representing a queue # 2) The class of the job being queued. # # You can optionally pass in additional params to the perform method within # the class. You will need to match the number of args in the perform method # when you queue the job. Otherwise this will throw an exception. def enqueue(queue, klass, *args) Job.create(queue, klass, *args) end # Takes a string that represents a job queue. # Returns the size of the given queue. def size(queue) redis.llen("queue:#{queue}").to_i end # This method is used mostly internally to pop the next job off of the given # queue. It takes in a string representing a queue and will return a # Kthxbye::Job object if a job exists on the queue. This is destructive on # the queue as it will REMOVE the next job off queue and return the job object. def salvage(q) id = redis.lpop( "queue:#{q}" ) if id payload = decode( redis.hget( "data-store:#{q}", id ) ) return Job.new(id, q, payload) else log "No jobs found in #{q}" return nil end end # This is a generic queue peek method. It isn't used directly but is the basis # for the "ghost" methods "data_peek" and "result_peek". This method takes in # a string representing a redis hash store (only two in kthxbye: "data-store" # and "result-store"), a string representing a queue, and optionally a job id. # If a job id is given, it will return the data for that job only. Otherwise # it returns all the data for all jobs/results. def peek(store, queue, id=nil) if id decode( redis.hget( "#{store}-store:#{queue}", id ) ) else all = redis.hgetall( "#{store}-store:#{queue}" ) results = {} all.each {|k,v| results[k] = decode( v ) } return results end end def method_missing(name, *args) #:nodoc: method_name = name.id2name if method_name =~ /^(data|result)_peek$/ Kthxbye.send(:peek, $1, *args) else super end end # Returns all the queues Kthxbye knows about def queues redis.smembers( :queues ).sort end # This method takes a string and registers it as a queue in our "known queues" # list def register_queue(queue) redis.sadd(:queues, queue) unless redis.sismember(:queues, queue) end # Removes the queue from the active queue listing, does not delete queue. # This will lead to phantom queues. use delete_queue for complete removal # of queue. def unregister_queue(queue) redis.srem(:queues, queue) end # Completely removes queue: Unregisters it then deletes it should return true # in all cases (including if we try to delete a non-existent queue). # Note: also deletes the data and result stores for this queue. def delete_queue(queue) unregister_queue(queue) redis.del( "queue:#{queue}" ) redis.del( "data-store:#{queue}" ) redis.del( "result-store:#{queue}" ) true end # Returns all workers registered with Kthxbye by the Kthxbye::Worker class. # Special note: Workers are only registered once you call #run on the worker. # You may also run #register_worker on the worker to manually register it, but # this also occurs once the worker is run so there is no need to run this # manually. def workers workers = redis.smembers( :workers ) workers.map {|x| Worker.find( x ) } end # Returns all of the workers that are currently working a job. # Also returns the job id and started time of the worker as a hash as follows: # # [worker_id, {:job_id, :started}] def working workers = redis.smembers( :working ) data = [] workers.each do |w_id| data << [w_id, decode( redis.get("worker:#{w_id}") )] end return data end # Returns either the job results for a specific job (if id specified). # If a job is not specified, it returns all the job results for the given # queue. def job_results(queue, id=nil) if id decode( redis.hget( "result-store:#{queue}", id ) ) else Array( redis.hgetall( "result-store:#{queue}" ) ) end end # Returns a pretty inspect message about this instance of Kthxbye. def inspect { :version => Version, :keys => keys.size, :workers => workers.size, :working => working.size, :queues => queues.size, :failed => Stats["failures"], :jobs_processed => Stats["processed"], :pending => queues.inject(0) {|m,o| m + size(o)} } end end