lib/resque-bus.rb in resque-bus-0.3.7 vs lib/resque-bus.rb in resque-bus-0.5.7
- old
+ new
@@ -1,311 +1,19 @@
+require "queue-bus"
+require "resque_bus/adapter"
require "resque_bus/version"
-require 'redis/namespace'
-require 'resque'
-
-module QueueBus
- autoload :Worker, 'resque_bus/compatibility'
-end
-
module ResqueBus
-
- autoload :Application, 'resque_bus/application'
- autoload :Dispatch, 'resque_bus/dispatch'
- autoload :Driver, 'resque_bus/driver'
- autoload :Heartbeat, 'resque_bus/heartbeat'
- autoload :Local, 'resque_bus/local'
- autoload :Matcher, 'resque_bus/matcher'
- autoload :Publisher, 'resque_bus/publisher'
- autoload :Rider, 'resque_bus/rider'
- autoload :Subscriber, 'resque_bus/subscriber'
- autoload :Subscription, 'resque_bus/subscription'
- autoload :SubscriptionList, 'resque_bus/subscription_list'
- autoload :TaskManager, 'resque_bus/task_manager'
- autoload :Util, 'resque_bus/util'
+ # TODO: all of this will be removed
- class << self
-
- def default_app_key=val
- @default_app_key = Application.normalize(val)
- end
-
- def default_app_key
- @default_app_key
- end
-
- def default_queue=val
- @default_queue = val
- end
-
- def default_queue
- @default_queue
- end
+ autoload :Deprecated, 'resque_bus/compatibility/deprecated'
+ autoload :Subscriber, 'resque_bus/compatibility/subscriber'
+ autoload :TaskManager, 'resque_bus/compatibility/task_manager'
+ autoload :Driver, 'resque_bus/compatibility/driver'
+ autoload :Rider, 'resque_bus/compatibility/rider'
+ autoload :Publisher, 'resque_bus/compatibility/publisher'
+ autoload :Heartbeat, 'resque_bus/compatibility/heartbeat'
- def hostname
- @hostname ||= `hostname 2>&1`.strip.sub(/.local/,'')
- end
-
- def dispatch(app_key=nil, &block)
- dispatcher = dispatcher_by_key(app_key)
- dispatcher.instance_eval(&block)
- dispatcher
- end
-
- def dispatchers
- @dispatchers ||= {}
- @dispatchers.values
- end
-
- def dispatcher_by_key(app_key)
- app_key = Application.normalize(app_key || default_app_key)
- @dispatchers ||= {}
- @dispatchers[app_key] ||= Dispatch.new(app_key)
- end
-
- def dispatcher_execute(app_key, key, attributes)
- @dispatchers ||= {}
- dispatcher = @dispatchers[app_key]
- dispatcher.execute(key, attributes) if dispatcher
- end
+ extend ::ResqueBus::Deprecated
+end
- def local_mode=value
- @local_mode = value
- end
-
- def local_mode
- @local_mode
- end
-
- def heartbeat!
- # turn on the heartbeat
- # should be down after loading scheduler yml if you do that
- # otherwise, anytime
- require 'resque/scheduler'
- name = 'resquebus_hearbeat'
- schedule = { 'class' => '::ResqueBus::Heartbeat',
- 'cron' => '* * * * *', # every minute
- 'queue' => incoming_queue,
- 'description' => 'I publish a heartbeat_minutes event every minute'
- }
- if Resque::Scheduler.dynamic
- Resque.set_schedule(name, schedule)
- end
- Resque.schedule[name] = schedule
- end
-
- # Accepts:
- # 1. A 'hostname:port' String
- # 2. A 'hostname:port:db' String (to select the Redis db)
- # 3. A 'hostname:port/namespace' String (to set the Redis namespace)
- # 4. A Redis URL String 'redis://host:port'
- # 5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
- # or `Redis::Namespace`.
- def redis=(server)
- case server
- when String
- if server =~ /redis\:\/\//
- redis = Redis.connect(:url => server, :thread_safe => true)
- else
- server, namespace = server.split('/', 2)
- host, port, db = server.split(':')
- redis = Redis.new(:host => host, :port => port,
- :thread_safe => true, :db => db)
- end
- namespace ||= default_namespace
-
- @redis = Redis::Namespace.new(namespace, :redis => redis)
- when Redis::Namespace
- @redis = server
- else
- @redis = Redis::Namespace.new(default_namespace, :redis => server)
- end
- end
-
- # Returns the current Redis connection. If none has been created, will
- # create a new one from the Reqsue one (with a different namespace)
- def redis
- return @redis if @redis
- copy = Resque.redis.clone
- copy.namespace = default_namespace
- self.redis = copy
- self.redis
- end
-
- def original_redis=(server)
- @original_redis = server
- end
- def original_redis
- @original_redis
- end
-
- def with_global_attributes(attributes)
- original_timezone = false
- original_locale = false
-
- if attributes["bus_locale"] && defined?(I18n) && I18n.respond_to?(:locale=)
- original_locale = I18n.locale if I18n.respond_to?(:locale)
- I18n.locale = attributes["bus_locale"]
- end
-
- if attributes["bus_timezone"] && defined?(Time) && Time.respond_to?(:zone=)
- original_timezone = Time.zone if Time.respond_to?(:zone)
- Time.zone = attributes["bus_timezone"]
- end
-
- yield
- ensure
- I18n.locale = original_locale unless original_locale == false
- Time.zone = original_timezone unless original_timezone == false
- end
-
- def before_publish=(proc)
- @before_publish_callback = proc
- end
-
- def before_publish_callback(attributes)
- if @before_publish_callback
- @before_publish_callback.call(attributes)
- end
- end
-
-
- def publish_metadata(event_type, attributes={})
- # TODO: "bus_app_key" => application.app_key ?
- bus_attr = {"bus_published_at" => Time.now.to_i, "bus_event_type" => event_type}
- bus_attr["bus_id"] = "#{Time.now.to_i}-#{generate_uuid}"
- bus_attr["bus_app_hostname"] = hostname
- bus_attr["bus_locale"] = I18n.locale.to_s if defined?(I18n) && I18n.respond_to?(:locale) && I18n.locale
- bus_attr["bus_timezone"] = Time.zone.name if defined?(Time) && Time.respond_to?(:zone) && Time.zone
- out = bus_attr.merge(attributes || {})
- ResqueBus.before_publish_callback(out)
- out
- end
-
- def generate_uuid
- require 'securerandom' unless defined?(SecureRandom)
- return SecureRandom.uuid
-
- rescue Exception => e
- # secure random not there
- # big random number a few times
- n_bytes = [42].pack('i').size
- n_bits = n_bytes * 8
- max = 2 ** (n_bits - 2) - 1
- return "#{rand(max)}-#{rand(max)}-#{rand(max)}"
- end
-
- def publish(event_type, attributes = {})
- to_publish = publish_metadata(event_type, attributes)
- ResqueBus.log_application("Event published: #{event_type} #{to_publish.inspect}")
- if local_mode
- ResqueBus::Local.perform(to_publish)
- else
- enqueue_to(incoming_queue, Driver, to_publish)
- end
- end
-
- def publish_at(timestamp_or_epoch, event_type, attributes = {})
- to_publish = publish_metadata(event_type, attributes)
- to_publish["bus_delayed_until"] ||= timestamp_or_epoch.to_i
- to_publish.delete("bus_published_at") unless attributes["bus_published_at"] # will be put on when it actually does it
-
- ResqueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{timestamp_or_epoch.to_i}")
- item = delayed_job_to_hash_with_queue(incoming_queue, Publisher, [event_type, to_publish])
- delayed_push(timestamp_or_epoch, item)
- end
-
- def enqueue_to(queue, klass, *args)
- push(queue, :class => klass.to_s, :args => args)
- end
-
- def logger
- @logger
- end
-
- def logger=val
- @logger = val
- end
-
- def log_application(message)
- if logger
- time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
- logger.info("** [#{time}] #$$: ResqueBus #{message}")
- end
- end
-
- def log_worker(message)
- if ENV['LOGGING'] || ENV['VERBOSE'] || ENV['VVERBOSE']
- time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
- puts "** [#{time}] #$$: #{message}"
- end
- end
-
- protected
-
- def reset
- # used by tests
- @redis = nil # clear instance of redis
- @dispatcher = nil
- @default_app_key = nil
- @default_queue = nil
- @before_publish_callback = nil
- end
-
- def incoming_queue
- "resquebus_incoming"
- end
-
- def default_namespace
- # It might play better on the same server, but overall life is more complicated
- :resque
- end
-
- ## From Resque, but using a (possibly) different instance of Redis
-
- # Pushes a job onto a queue. Queue name should be a string and the
- # item should be any JSON-able Ruby object.
- #
- # Resque works generally expect the `item` to be a hash with the following
- # keys:
- #
- # class - The String name of the job to run.
- # args - An Array of arguments to pass the job. Usually passed
- # via `class.to_class.perform(*args)`.
- #
- # Example
- #
- # Resque.push('archive', :class => 'Archive', :args => [ 35, 'tar' ])
- #
- # Returns nothing
- def push(queue, item)
- watch_queue(queue)
- redis.rpush "queue:#{queue}", Resque.encode(item)
- end
-
- # Used internally to keep track of which queues we've created.
- # Don't call this directly.
- def watch_queue(queue)
- redis.sadd(:queues, queue.to_s)
- end
-
- ### From Resque Scheduler
- # Used internally to stuff the item into the schedule sorted list.
- # +timestamp+ can be either in seconds or a datetime object
- # Insertion if O(log(n)).
- # Returns true if it's the first job to be scheduled at that time, else false
- def delayed_push(timestamp, item)
- # First add this item to the list for this timestamp
- redis.rpush("delayed:#{timestamp.to_i}", Resque.encode(item))
-
- # Now, add this timestamp to the zsets. The score and the value are
- # the same since we'll be querying by timestamp, and we don't have
- # anything else to store.
- redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
- end
-
- def delayed_job_to_hash_with_queue(queue, klass, args)
- {:class => klass.to_s, :args => args, :queue => queue}
- end
- end
-
-end
\ No newline at end of file
+QueueBus.adapter = QueueBus::Adapters::Resque.new