require 'qu' require 'simple_uuid' module Qu module Backend class Memory < Base def initialize @workers = Hash.new @queues = Hash.new {|h,k| h[k] = (k == "failed" ? {} : [])} end def connection @connection ||= true end def queues @queues.keys.reject { |queue| @queues[queue].empty? || queue == "failed" } end def length(queue = 'default') @queues[queue].length end def clear(queue = nil) queue ||= queues + ['failed'] logger.info { "Clearing queues: #{queue.inspect}" } Array(queue).each do |q| logger.debug "Clearing queue #{q}" @queues.delete(q) end end def register_worker(worker) logger.debug "Registering worker #{worker.id}" @workers[worker.id] = worker.attributes.merge(:id => worker.id) end def unregister_worker(worker) logger.debug "Unregistering worker #{worker.id}" @workers.delete(worker.id) end def clear_workers @workers = Hash.new end def workers @workers.map do |w| Qu::Worker.new(w) end end def enqueue(payload) payload.id = SimpleUUID::UUID.new.to_guid @queues[payload.queue] << payload logger.debug { "Enqueued job #{payload}" } payload end def delayed_push(time, payload) payload.id = SimpleUUID::UUID.new.to_guid payload.time = time @queues[scheduled_queue_name(payload.klass)] << payload logger.debug { "Enqueued delayed job #{payload}" } payload end def remove_delayed(klass, *args) get_queue_by_name(scheduled_queue_name(klass)).delete_if do |payload| payload.klass.to_s == klass.to_s && payload.args == args end end def reserve(worker, options = {:block => true}) loop do worker.queues.each do |queue| logger.debug { "Reserving job in queue #{queue}" } return @queues[queue].shift unless @queues[queue].empty? end end end def failed(payload, error) @queues["failed"][payload.id] = payload end def requeue(id) logger.debug "Requeuing job #{id}" if payload = @queues["failed"].delete(id) @queues[payload.queue] << payload payload else false end end def release(payload) @queues[payload.queue] << payload end def completed(payload); end def get_queue_by_name(queue = 'default') @queues[queue.to_s] end def get_queue_by_klass(klass) payload = Payload.new(:klass => klass) get_queue_by_name(payload.queue) end def get_schedule_by_klass(klass) get_queue_by_name(scheduled_queue_name(klass)) end private def scheduled_queue_name(klass) "#{klass.to_s}_scheduled" end end end end