lib/resque-bus.rb in resque-bus-0.3.7 vs lib/resque-bus.rb in resque-bus-0.5.7

- old
+ new

@@ -1,311 +1,19 @@ +require "queue-bus" +require "resque_bus/adapter" require "resque_bus/version" -require 'redis/namespace' -require 'resque' - -module QueueBus - autoload :Worker, 'resque_bus/compatibility' -end - module ResqueBus - - autoload :Application, 'resque_bus/application' - autoload :Dispatch, 'resque_bus/dispatch' - autoload :Driver, 'resque_bus/driver' - autoload :Heartbeat, 'resque_bus/heartbeat' - autoload :Local, 'resque_bus/local' - autoload :Matcher, 'resque_bus/matcher' - autoload :Publisher, 'resque_bus/publisher' - autoload :Rider, 'resque_bus/rider' - autoload :Subscriber, 'resque_bus/subscriber' - autoload :Subscription, 'resque_bus/subscription' - autoload :SubscriptionList, 'resque_bus/subscription_list' - autoload :TaskManager, 'resque_bus/task_manager' - autoload :Util, 'resque_bus/util' + # TODO: all of this will be removed - class << self - - def default_app_key=val - @default_app_key = Application.normalize(val) - end - - def default_app_key - @default_app_key - end - - def default_queue=val - @default_queue = val - end - - def default_queue - @default_queue - end + autoload :Deprecated, 'resque_bus/compatibility/deprecated' + autoload :Subscriber, 'resque_bus/compatibility/subscriber' + autoload :TaskManager, 'resque_bus/compatibility/task_manager' + autoload :Driver, 'resque_bus/compatibility/driver' + autoload :Rider, 'resque_bus/compatibility/rider' + autoload :Publisher, 'resque_bus/compatibility/publisher' + autoload :Heartbeat, 'resque_bus/compatibility/heartbeat' - def hostname - @hostname ||= `hostname 2>&1`.strip.sub(/.local/,'') - end - - def dispatch(app_key=nil, &block) - dispatcher = dispatcher_by_key(app_key) - dispatcher.instance_eval(&block) - dispatcher - end - - def dispatchers - @dispatchers ||= {} - @dispatchers.values - end - - def dispatcher_by_key(app_key) - app_key = Application.normalize(app_key || default_app_key) - @dispatchers ||= {} - @dispatchers[app_key] ||= Dispatch.new(app_key) - end - - def dispatcher_execute(app_key, key, attributes) - @dispatchers ||= {} - dispatcher = @dispatchers[app_key] - dispatcher.execute(key, attributes) if dispatcher - end + extend ::ResqueBus::Deprecated +end - def local_mode=value - @local_mode = value - end - - def local_mode - @local_mode - end - - def heartbeat! - # turn on the heartbeat - # should be down after loading scheduler yml if you do that - # otherwise, anytime - require 'resque/scheduler' - name = 'resquebus_hearbeat' - schedule = { 'class' => '::ResqueBus::Heartbeat', - 'cron' => '* * * * *', # every minute - 'queue' => incoming_queue, - 'description' => 'I publish a heartbeat_minutes event every minute' - } - if Resque::Scheduler.dynamic - Resque.set_schedule(name, schedule) - end - Resque.schedule[name] = schedule - end - - # Accepts: - # 1. A 'hostname:port' String - # 2. A 'hostname:port:db' String (to select the Redis db) - # 3. A 'hostname:port/namespace' String (to set the Redis namespace) - # 4. A Redis URL String 'redis://host:port' - # 5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`, - # or `Redis::Namespace`. - def redis=(server) - case server - when String - if server =~ /redis\:\/\// - redis = Redis.connect(:url => server, :thread_safe => true) - else - server, namespace = server.split('/', 2) - host, port, db = server.split(':') - redis = Redis.new(:host => host, :port => port, - :thread_safe => true, :db => db) - end - namespace ||= default_namespace - - @redis = Redis::Namespace.new(namespace, :redis => redis) - when Redis::Namespace - @redis = server - else - @redis = Redis::Namespace.new(default_namespace, :redis => server) - end - end - - # Returns the current Redis connection. If none has been created, will - # create a new one from the Reqsue one (with a different namespace) - def redis - return @redis if @redis - copy = Resque.redis.clone - copy.namespace = default_namespace - self.redis = copy - self.redis - end - - def original_redis=(server) - @original_redis = server - end - def original_redis - @original_redis - end - - def with_global_attributes(attributes) - original_timezone = false - original_locale = false - - if attributes["bus_locale"] && defined?(I18n) && I18n.respond_to?(:locale=) - original_locale = I18n.locale if I18n.respond_to?(:locale) - I18n.locale = attributes["bus_locale"] - end - - if attributes["bus_timezone"] && defined?(Time) && Time.respond_to?(:zone=) - original_timezone = Time.zone if Time.respond_to?(:zone) - Time.zone = attributes["bus_timezone"] - end - - yield - ensure - I18n.locale = original_locale unless original_locale == false - Time.zone = original_timezone unless original_timezone == false - end - - def before_publish=(proc) - @before_publish_callback = proc - end - - def before_publish_callback(attributes) - if @before_publish_callback - @before_publish_callback.call(attributes) - end - end - - - def publish_metadata(event_type, attributes={}) - # TODO: "bus_app_key" => application.app_key ? - bus_attr = {"bus_published_at" => Time.now.to_i, "bus_event_type" => event_type} - bus_attr["bus_id"] = "#{Time.now.to_i}-#{generate_uuid}" - bus_attr["bus_app_hostname"] = hostname - bus_attr["bus_locale"] = I18n.locale.to_s if defined?(I18n) && I18n.respond_to?(:locale) && I18n.locale - bus_attr["bus_timezone"] = Time.zone.name if defined?(Time) && Time.respond_to?(:zone) && Time.zone - out = bus_attr.merge(attributes || {}) - ResqueBus.before_publish_callback(out) - out - end - - def generate_uuid - require 'securerandom' unless defined?(SecureRandom) - return SecureRandom.uuid - - rescue Exception => e - # secure random not there - # big random number a few times - n_bytes = [42].pack('i').size - n_bits = n_bytes * 8 - max = 2 ** (n_bits - 2) - 1 - return "#{rand(max)}-#{rand(max)}-#{rand(max)}" - end - - def publish(event_type, attributes = {}) - to_publish = publish_metadata(event_type, attributes) - ResqueBus.log_application("Event published: #{event_type} #{to_publish.inspect}") - if local_mode - ResqueBus::Local.perform(to_publish) - else - enqueue_to(incoming_queue, Driver, to_publish) - end - end - - def publish_at(timestamp_or_epoch, event_type, attributes = {}) - to_publish = publish_metadata(event_type, attributes) - to_publish["bus_delayed_until"] ||= timestamp_or_epoch.to_i - to_publish.delete("bus_published_at") unless attributes["bus_published_at"] # will be put on when it actually does it - - ResqueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{timestamp_or_epoch.to_i}") - item = delayed_job_to_hash_with_queue(incoming_queue, Publisher, [event_type, to_publish]) - delayed_push(timestamp_or_epoch, item) - end - - def enqueue_to(queue, klass, *args) - push(queue, :class => klass.to_s, :args => args) - end - - def logger - @logger - end - - def logger=val - @logger = val - end - - def log_application(message) - if logger - time = Time.now.strftime('%H:%M:%S %Y-%m-%d') - logger.info("** [#{time}] #$$: ResqueBus #{message}") - end - end - - def log_worker(message) - if ENV['LOGGING'] || ENV['VERBOSE'] || ENV['VVERBOSE'] - time = Time.now.strftime('%H:%M:%S %Y-%m-%d') - puts "** [#{time}] #$$: #{message}" - end - end - - protected - - def reset - # used by tests - @redis = nil # clear instance of redis - @dispatcher = nil - @default_app_key = nil - @default_queue = nil - @before_publish_callback = nil - end - - def incoming_queue - "resquebus_incoming" - end - - def default_namespace - # It might play better on the same server, but overall life is more complicated - :resque - end - - ## From Resque, but using a (possibly) different instance of Redis - - # Pushes a job onto a queue. Queue name should be a string and the - # item should be any JSON-able Ruby object. - # - # Resque works generally expect the `item` to be a hash with the following - # keys: - # - # class - The String name of the job to run. - # args - An Array of arguments to pass the job. Usually passed - # via `class.to_class.perform(*args)`. - # - # Example - # - # Resque.push('archive', :class => 'Archive', :args => [ 35, 'tar' ]) - # - # Returns nothing - def push(queue, item) - watch_queue(queue) - redis.rpush "queue:#{queue}", Resque.encode(item) - end - - # Used internally to keep track of which queues we've created. - # Don't call this directly. - def watch_queue(queue) - redis.sadd(:queues, queue.to_s) - end - - ### From Resque Scheduler - # Used internally to stuff the item into the schedule sorted list. - # +timestamp+ can be either in seconds or a datetime object - # Insertion if O(log(n)). - # Returns true if it's the first job to be scheduled at that time, else false - def delayed_push(timestamp, item) - # First add this item to the list for this timestamp - redis.rpush("delayed:#{timestamp.to_i}", Resque.encode(item)) - - # Now, add this timestamp to the zsets. The score and the value are - # the same since we'll be querying by timestamp, and we don't have - # anything else to store. - redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i - end - - def delayed_job_to_hash_with_queue(queue, klass, args) - {:class => klass.to_s, :args => args, :queue => queue} - end - end - -end \ No newline at end of file +QueueBus.adapter = QueueBus::Adapters::Resque.new