lib/cloudist.rb in cloudist-0.2.1 vs lib/cloudist.rb in cloudist-0.4.1

- old
+ new

@@ -1,36 +1,41 @@ require 'uri' require 'json' unless defined? ActiveSupport::JSON -require "active_support/hash_with_indifferent_access" require "amqp" -require "mq" +require "hashie" require "logger" require "digest/md5" +require "uuid" $:.unshift File.dirname(__FILE__) + +require "em/em_timer_utils" require "cloudist/core_ext/string" require "cloudist/core_ext/object" +require "cloudist/core_ext/class" require "cloudist/errors" require "cloudist/utils" +require "cloudist/encoding" require "cloudist/queues/basic_queue" require "cloudist/queues/job_queue" require "cloudist/queues/reply_queue" require "cloudist/publisher" require "cloudist/payload" require "cloudist/request" -require "cloudist/callback_methods" require "cloudist/listener" -require "cloudist/callback" -require "cloudist/callbacks/error_callback" require "cloudist/job" require "cloudist/worker" module Cloudist + DEFAULT_TTL = 300 + class << self + thread_local_accessor :channels, :default => {} + thread_local_accessor :workers, :default => {} + thread_local_accessor :listeners, :default => [] + thread_local_accessor :listener_instances, :default => {} - @@workers = {} - # Start the Cloudist loop # # Cloudist.start { # # Do stuff in here # } @@ -39,27 +44,38 @@ # * :user => 'name' # * :pass => 'secret' # * :host => 'localhost' # * :port => 5672 # * :vhost => / + # * :heartbeat => 0 + # * :logging => false # # Refer to default config below for how to set these as defaults # def start(options = {}, &block) config = settings.update(options) AMQP.start(config) do - self.instance_eval(&block) + self.instance_eval(&block) if block_given? end end + + def connection + AMQP.connection + end + + def connection=(conn) + AMQP.connection = conn + end # Define a worker. Must be called inside start loop # # worker { # job('make.sandwich') {} # } # # REMOVED + # def worker(&block) raise NotImplementedError, "This DSL format has been removed. Please use job('make.sandwich') {} instead." end # Defines a job handler (GenericWorker) @@ -71,12 +87,17 @@ # job.finished! # } # # Refer to sandwich_worker.rb example # - def job(queue_name, &block) - register_worker(queue_name, &block) + def job(queue_name) + if block_given? + block = Proc.new + register_worker(queue_name, &block) + else + raise ArgumentError, "You must supply a block as the last argument" + end end # Registers a worker class to handle a specific queue # # Cloudist.handle('make.sandwich', 'eat.sandwich').with(MyWorker) @@ -105,85 +126,110 @@ def register_worker(queue_name, klass = nil, &block) job_queue = JobQueue.new(queue_name) job_queue.subscribe do |request| j = Job.new(request.payload.dup) - EM.defer do - begin - if block_given? - worker_instance = GenericWorker.new(j, job_queue.q) - worker_instance.process(&block) - elsif klass - worker_instance = klass.new(j, job_queue.q) - worker_instance.process - else - raise RuntimeError, "Failed to register worker, I need either a handler class or block." - end - finished = Time.now.utc.to_i - log.debug("Finished Job in #{finished - request.start} seconds") - - rescue Exception => e - j.handle_error(e) + begin + if block_given? + worker_instance = GenericWorker.new(j, job_queue.q) + worker_instance.process(&block) + elsif klass + worker_instance = klass.new(j, job_queue.q) + worker_instance.process + else + raise RuntimeError, "Failed to register worker, I need either a handler class or block." end + rescue Exception => e + j.handle_error(e) + ensure + finished = Time.now.utc.to_f + log.debug("Finished Job in #{finished - request.start} seconds") + j.reply({:runtime => (finished - request.start)}, {:message_type => 'runtime'}) + j.cleanup end - j.cleanup end - ((@@workers[queue_name.to_s] ||= []) << job_queue).uniq! + ((self.workers[queue_name.to_s] ||= []) << job_queue).uniq! end # Accepts either a queue name or a job instance returned from enqueue. # This method operates in two modes, when given a queue name, it # will return all responses regardless of job id so you can use the job # id to lookup a database record to update etc. # When given a job instance it will only return messages from that job. + # + # DEPRECATED + # def listen(*queue_names, &block) - @@listeners ||= [] - queue_names.each do |job_or_queue_name| - _listener = Cloudist::Listener.new(job_or_queue_name) - _listener.subscribe(&block) - @@listeners << _listener + raise NotImplementedError, "This DSL method has been removed. Please use add_listener" + end + + # Adds a listener class + def add_listener(klass) + raise ArgumentError, "Your listener must extend Cloudist::Listener" unless klass.superclass == Cloudist::Listener + raise ArgumentError, "Your listener must declare at least one queue to listen to. Use listen_to 'queue.name'" if klass.job_queue_names.nil? + + klass.job_queue_names.each do |queue_name| + klass.subscribe(queue_name) end - return @@listeners + + self.listeners << klass + + return self.listeners end # Enqueues a job. # Takes a queue name and data hash to be sent to the worker. # Returns Job instance # Use Job#id to reference job later on. def enqueue(job_queue_name, data = nil) - raise EnqueueError, "Incorrect arguments, you must include data when enquing job" if data.nil? + raise EnqueueError, "Incorrect arguments, you must include data when enqueuing job" if data.nil? # TODO: Detect if inside loop, if not use bunny sync Cloudist::Publisher.enqueue(job_queue_name, data) end + + # Send a reply synchronously + # This uses bunny instead of AMQP and as such can be run outside + # of EventMachine and the Cloudist start loop. + # + # Usage: Cloudist.reply('make.sandwich', {:sandwhich_id => 12345}) + # + # def reply(queue_name, job_id, data, options = {}) + # headers = { + # :message_id => job_id, + # :message_type => "reply", + # # :event => 'working', + # :message_type => 'reply' + # }.update(options) + # + # payload = Cloudist::Payload.new(data, headers) + # + # queue = Cloudist::SyncReplyQueue.new(queue_name) + # + # queue.setup + # queue.publish_to_q(payload) + # end # Call this at anytime inside the loop to exit the app. def stop_safely - # ::EM.add_timer(0.2) { - ::AMQP.stop { - ::EM.stop + if EM.reactor_running? + ::EM.add_timer(0.2) { + ::AMQP.stop { + ::EM.stop + puts "\n" + } } - # } + end end alias :stop :stop_safely - - def closing? - ::AMQP.closing? - end - - def log - @@log ||= Logger.new($stdout) - end - def log=(log) - @@log = log - end - def handle_error(e) log.error "#{e.class}: #{e.message}"#, :exception => e - log.error e.backtrace.join("\n") + e.backtrace.each do |line| + log.error line + end end def version @@version ||= File.read(File.dirname(__FILE__) + '/../VERSION').strip end @@ -193,11 +239,13 @@ { :vhost => uri.path, :host => uri.host, :user => uri.user, :port => uri.port || 5672, - :pass => uri.password + :pass => uri.password, + :heartbeat => 0, + :logging => false } rescue Object => e raise "invalid AMQP_URL: (#{uri.inspect}) #{e.class} -> #{e.message}" end @@ -212,16 +260,25 @@ def signal_trap! ::Signal.trap('INT') { Cloudist.stop } ::Signal.trap('TERM'){ Cloudist.stop } end - def workers - @@workers + def log + @@log ||= Logger.new($stdout) end + + def log=(log) + @@log = log + end + alias :install_signal_trap :signal_trap! + def remove_workers - @@workers = {} + self.workers.keys.each do |worker| + self.workers.delete(worker) + end end end + include Cloudist::EMTimerUtils end \ No newline at end of file