lib/resque-bus.rb in resque-bus-0.2.4 vs lib/resque-bus.rb in resque-bus-0.2.5
- old
+ new
@@ -1,250 +1,254 @@
+require "resque_bus/version"
+
require 'redis/namespace'
require 'resque'
-require "resque_bus/version"
-require 'resque_bus/util'
-require 'resque_bus/matcher'
-require 'resque_bus/subscription'
-require 'resque_bus/subscription_list'
-require 'resque_bus/subscriber'
-require 'resque_bus/application'
-require 'resque_bus/publisher'
-require 'resque_bus/driver'
-require 'resque_bus/local'
-require 'resque_bus/rider'
-require 'resque_bus/dispatch'
-require 'resque_bus/task_manager'
module ResqueBus
- extend self
- def default_app_key=val
- @default_app_key = Application.normalize(val)
- end
-
- def default_app_key
- @default_app_key
- end
-
- def default_queue=val
- @default_queue = val
- end
-
- def default_queue
- @default_queue
- end
+ autoload :Application, 'resque_bus/application'
+ autoload :Dispatch, 'resque_bus/dispatch'
+ autoload :Driver, 'resque_bus/driver'
+ autoload :Local, 'resque_bus/local'
+ autoload :Matcher, 'resque_bus/matcher'
+ autoload :Publisher, 'resque_bus/publisher'
+ autoload :Rider, 'resque_bus/rider'
+ autoload :Subscriber, 'resque_bus/subscriber'
+ autoload :Subscription, 'resque_bus/subscription'
+ autoload :SubscriptionList, 'resque_bus/subscription_list'
+ autoload :TaskManager, 'resque_bus/task_manager'
+ autoload :Util, 'resque_bus/util'
- def hostname
- @hostname ||= `hostname 2>&1`.strip.sub(/.local/,'')
- end
-
- def dispatch(app_key=nil, &block)
- dispatcher = dispatcher_by_key(app_key)
- dispatcher.instance_eval(&block)
- dispatcher
- end
-
- def dispatchers
- @dispatchers ||= {}
- @dispatchers.values
- end
-
- def dispatcher_by_key(app_key)
- app_key = Application.normalize(app_key || default_app_key)
- @dispatchers ||= {}
- @dispatchers[app_key] ||= Dispatch.new(app_key)
- end
-
- def dispatcher_execute(app_key, key, attributes)
- @dispatchers ||= {}
- dispatcher = @dispatchers[app_key]
- dispatcher.execute(key, attributes) if dispatcher
- end
+ class << self
+
+ def default_app_key=val
+ @default_app_key = Application.normalize(val)
+ end
+
+ def default_app_key
+ @default_app_key
+ end
+
+ def default_queue=val
+ @default_queue = val
+ end
+
+ def default_queue
+ @default_queue
+ end
- def local_mode=value
- @local_mode = value
- end
+ def hostname
+ @hostname ||= `hostname 2>&1`.strip.sub(/.local/,'')
+ end
+
+ def dispatch(app_key=nil, &block)
+ dispatcher = dispatcher_by_key(app_key)
+ dispatcher.instance_eval(&block)
+ dispatcher
+ end
+
+ def dispatchers
+ @dispatchers ||= {}
+ @dispatchers.values
+ end
+
+ def dispatcher_by_key(app_key)
+ app_key = Application.normalize(app_key || default_app_key)
+ @dispatchers ||= {}
+ @dispatchers[app_key] ||= Dispatch.new(app_key)
+ end
+
+ def dispatcher_execute(app_key, key, attributes)
+ @dispatchers ||= {}
+ dispatcher = @dispatchers[app_key]
+ dispatcher.execute(key, attributes) if dispatcher
+ end
- def local_mode
- @local_mode
- end
+ def local_mode=value
+ @local_mode = value
+ end
- # Accepts:
- # 1. A 'hostname:port' String
- # 2. A 'hostname:port:db' String (to select the Redis db)
- # 3. A 'hostname:port/namespace' String (to set the Redis namespace)
- # 4. A Redis URL String 'redis://host:port'
- # 5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
- # or `Redis::Namespace`.
- def redis=(server)
- case server
- when String
- if server =~ /redis\:\/\//
- redis = Redis.connect(:url => server, :thread_safe => true)
+ def local_mode
+ @local_mode
+ end
+
+ # Accepts:
+ # 1. A 'hostname:port' String
+ # 2. A 'hostname:port:db' String (to select the Redis db)
+ # 3. A 'hostname:port/namespace' String (to set the Redis namespace)
+ # 4. A Redis URL String 'redis://host:port'
+ # 5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
+ # or `Redis::Namespace`.
+ def redis=(server)
+ case server
+ when String
+ if server =~ /redis\:\/\//
+ redis = Redis.connect(:url => server, :thread_safe => true)
+ else
+ server, namespace = server.split('/', 2)
+ host, port, db = server.split(':')
+ redis = Redis.new(:host => host, :port => port,
+ :thread_safe => true, :db => db)
+ end
+ namespace ||= default_namespace
+
+ @redis = Redis::Namespace.new(namespace, :redis => redis)
+ when Redis::Namespace
+ @redis = server
else
- server, namespace = server.split('/', 2)
- host, port, db = server.split(':')
- redis = Redis.new(:host => host, :port => port,
- :thread_safe => true, :db => db)
+ @redis = Redis::Namespace.new(default_namespace, :redis => server)
end
- namespace ||= default_namespace
-
- @redis = Redis::Namespace.new(namespace, :redis => redis)
- when Redis::Namespace
- @redis = server
- else
- @redis = Redis::Namespace.new(default_namespace, :redis => server)
end
- end
- # Returns the current Redis connection. If none has been created, will
- # create a new one from the Reqsue one (with a different namespace)
- def redis
- return @redis if @redis
- copy = Resque.redis.clone
- copy.namespace = default_namespace
- self.redis = copy
- self.redis
- end
-
- def original_redis=(server)
- @original_redis = server
- end
- def original_redis
- @original_redis
- end
-
- def publish_metadata(event_type, attributes={})
- # TODO: "bus_app_key" => application.app_key ?
- bus_attr = {"bus_published_at" => Time.now.to_i, "created_at" => Time.now.to_i, "bus_event_type" => event_type}
- bus_attr["bus_id"] ||= "#{Time.now.to_i}-#{generate_uuid}"
- bus_attr["bus_app_hostname"] = hostname
- bus_attr.merge(attributes || {})
- end
-
- def generate_uuid
- require 'securerandom' unless defined?(SecureRandom)
- return SecureRandom.uuid
+ # Returns the current Redis connection. If none has been created, will
+ # create a new one from the Reqsue one (with a different namespace)
+ def redis
+ return @redis if @redis
+ copy = Resque.redis.clone
+ copy.namespace = default_namespace
+ self.redis = copy
+ self.redis
+ end
- rescue Exception => e
- # secure random not there
- # big random number a few times
- n_bytes = [42].pack('i').size
- n_bits = n_bytes * 8
- max = 2 ** (n_bits - 2) - 1
- return "#{rand(max)}-#{rand(max)}-#{rand(max)}"
- end
-
- def publish(event_type, attributes = {})
- to_publish = publish_metadata(event_type, attributes)
- ResqueBus.log_application("Event published: #{event_type} #{to_publish.inspect}")
- if local_mode
- ResqueBus::Local.perform(to_publish)
- else
- enqueue_to(incoming_queue, Driver, to_publish)
+ def original_redis=(server)
+ @original_redis = server
end
- end
-
- def publish_at(timestamp_or_epoch, event_type, attributes = {})
- to_publish = publish_metadata(event_type, attributes)
- to_publish["bus_delayed_until"] ||= timestamp_or_epoch.to_i
- to_publish.delete("bus_published_at") unless attributes["bus_published_at"] # will be put on when it actually does it
+ def original_redis
+ @original_redis
+ end
- ResqueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{timestamp_or_epoch.to_i}")
- item = delayed_job_to_hash_with_queue(incoming_queue, Publisher, [event_type, to_publish])
- delayed_push(timestamp_or_epoch, item)
- end
-
- def enqueue_to(queue, klass, *args)
- push(queue, :class => klass.to_s, :args => args)
- end
-
- def logger
- @logger
- end
-
- def logger=val
- @logger = val
- end
-
- def log_application(message)
- if logger
- time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
- logger.info("** [#{time}] #$$: ResqueBus #{message}")
+ def publish_metadata(event_type, attributes={})
+ # TODO: "bus_app_key" => application.app_key ?
+ bus_attr = {"bus_published_at" => Time.now.to_i, "created_at" => Time.now.to_i, "bus_event_type" => event_type}
+ bus_attr["bus_id"] ||= "#{Time.now.to_i}-#{generate_uuid}"
+ bus_attr["bus_app_hostname"] = hostname
+ bus_attr.merge(attributes || {})
end
- end
-
- def log_worker(message)
- if ENV['LOGGING'] || ENV['VERBOSE'] || ENV['VVERBOSE']
- time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
- puts "** [#{time}] #$$: #{message}"
+
+ def generate_uuid
+ require 'securerandom' unless defined?(SecureRandom)
+ return SecureRandom.uuid
+
+ rescue Exception => e
+ # secure random not there
+ # big random number a few times
+ n_bytes = [42].pack('i').size
+ n_bits = n_bytes * 8
+ max = 2 ** (n_bits - 2) - 1
+ return "#{rand(max)}-#{rand(max)}-#{rand(max)}"
end
- end
-
- protected
-
- def reset
- # used by tests
- @redis = nil # clear instance of redis
- @dispatcher = nil
- @default_app_key = nil
- @default_queue = nil
- end
-
- def incoming_queue
- "resquebus_incoming"
- end
+
+ def publish(event_type, attributes = {})
+ to_publish = publish_metadata(event_type, attributes)
+ ResqueBus.log_application("Event published: #{event_type} #{to_publish.inspect}")
+ if local_mode
+ ResqueBus::Local.perform(to_publish)
+ else
+ enqueue_to(incoming_queue, Driver, to_publish)
+ end
+ end
+
+ def publish_at(timestamp_or_epoch, event_type, attributes = {})
+ to_publish = publish_metadata(event_type, attributes)
+ to_publish["bus_delayed_until"] ||= timestamp_or_epoch.to_i
+ to_publish.delete("bus_published_at") unless attributes["bus_published_at"] # will be put on when it actually does it
+
+ ResqueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{timestamp_or_epoch.to_i}")
+ item = delayed_job_to_hash_with_queue(incoming_queue, Publisher, [event_type, to_publish])
+ delayed_push(timestamp_or_epoch, item)
+ end
+
+ def enqueue_to(queue, klass, *args)
+ push(queue, :class => klass.to_s, :args => args)
+ end
+
+ def logger
+ @logger
+ end
+
+ def logger=val
+ @logger = val
+ end
+
+ def log_application(message)
+ if logger
+ time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
+ logger.info("** [#{time}] #$$: ResqueBus #{message}")
+ end
+ end
+
+ def log_worker(message)
+ if ENV['LOGGING'] || ENV['VERBOSE'] || ENV['VVERBOSE']
+ time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
+ puts "** [#{time}] #$$: #{message}"
+ end
+ end
+
+ protected
+
+ def reset
+ # used by tests
+ @redis = nil # clear instance of redis
+ @dispatcher = nil
+ @default_app_key = nil
+ @default_queue = nil
+ end
+
+ def incoming_queue
+ "resquebus_incoming"
+ end
- def default_namespace
- # It might play better on the same server, but overall life is more complicated
- :resque
- end
-
- ## From Resque, but using a (possibly) different instance of Redis
-
- # Pushes a job onto a queue. Queue name should be a string and the
- # item should be any JSON-able Ruby object.
- #
- # Resque works generally expect the `item` to be a hash with the following
- # keys:
- #
- # class - The String name of the job to run.
- # args - An Array of arguments to pass the job. Usually passed
- # via `class.to_class.perform(*args)`.
- #
- # Example
- #
- # Resque.push('archive', :class => 'Archive', :args => [ 35, 'tar' ])
- #
- # Returns nothing
- def push(queue, item)
- watch_queue(queue)
- redis.rpush "queue:#{queue}", Resque.encode(item)
- end
-
- # Used internally to keep track of which queues we've created.
- # Don't call this directly.
- def watch_queue(queue)
- redis.sadd(:queues, queue.to_s)
- end
-
- ### From Resque Scheduler
- # Used internally to stuff the item into the schedule sorted list.
- # +timestamp+ can be either in seconds or a datetime object
- # Insertion if O(log(n)).
- # Returns true if it's the first job to be scheduled at that time, else false
- def delayed_push(timestamp, item)
- # First add this item to the list for this timestamp
- redis.rpush("delayed:#{timestamp.to_i}", Resque.encode(item))
+ def default_namespace
+ # It might play better on the same server, but overall life is more complicated
+ :resque
+ end
+
+ ## From Resque, but using a (possibly) different instance of Redis
+
+ # Pushes a job onto a queue. Queue name should be a string and the
+ # item should be any JSON-able Ruby object.
+ #
+ # Resque works generally expect the `item` to be a hash with the following
+ # keys:
+ #
+ # class - The String name of the job to run.
+ # args - An Array of arguments to pass the job. Usually passed
+ # via `class.to_class.perform(*args)`.
+ #
+ # Example
+ #
+ # Resque.push('archive', :class => 'Archive', :args => [ 35, 'tar' ])
+ #
+ # Returns nothing
+ def push(queue, item)
+ watch_queue(queue)
+ redis.rpush "queue:#{queue}", Resque.encode(item)
+ end
+
+ # Used internally to keep track of which queues we've created.
+ # Don't call this directly.
+ def watch_queue(queue)
+ redis.sadd(:queues, queue.to_s)
+ end
+
+ ### From Resque Scheduler
+ # Used internally to stuff the item into the schedule sorted list.
+ # +timestamp+ can be either in seconds or a datetime object
+ # Insertion if O(log(n)).
+ # Returns true if it's the first job to be scheduled at that time, else false
+ def delayed_push(timestamp, item)
+ # First add this item to the list for this timestamp
+ redis.rpush("delayed:#{timestamp.to_i}", Resque.encode(item))
- # Now, add this timestamp to the zsets. The score and the value are
- # the same since we'll be querying by timestamp, and we don't have
- # anything else to store.
- redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
+ # Now, add this timestamp to the zsets. The score and the value are
+ # the same since we'll be querying by timestamp, and we don't have
+ # anything else to store.
+ redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
+ end
+
+ def delayed_job_to_hash_with_queue(queue, klass, args)
+ {:class => klass.to_s, :args => args, :queue => queue}
+ end
end
- def delayed_job_to_hash_with_queue(queue, klass, args)
- {:class => klass.to_s, :args => args, :queue => queue}
- end
-
-end
+end
\ No newline at end of file