lib/message_driver/broker.rb in message-driver-0.2.2 vs lib/message_driver/broker.rb in message-driver-0.3.0
- old
+ new
@@ -1,42 +1,103 @@
-require 'forwardable'
-require 'logger'
-
module MessageDriver
class Broker
- extend Forwardable
+ include Logging
- attr_reader :adapter, :configuration, :destinations, :consumers, :logger
+ DEFAULT_BROKER_NAME = :default
+ attr_reader :adapter, :configuration, :destinations, :consumers, :name
+
class << self
- def configure(options)
- @instance = new(options)
+ def configure(name = DEFAULT_BROKER_NAME, options)
+ if brokers.keys.include? name
+ raise BrokerAlreadyConfigured, "there is already a broker named #{name} configured"
+ end
+ brokers[name] = new(name, options)
end
- def method_missing(m, *args, &block)
- @instance.send(m, *args, &block)
+ def define(name = DEFAULT_BROKER_NAME)
+ yield broker(name)
end
- def instance
- @instance
+ def broker(name = DEFAULT_BROKER_NAME)
+ result = brokers[name]
+ if result.nil?
+ raise BrokerNotConfigured, "There is no broker named #{name} configured. The configured brokers are #{brokers.keys}"
+ end
+ result
end
- def define
- yield @instance
+ def client(name)
+ unless result = clients[name]
+ result = clients[name] = Client.for_broker(name)
+ end
+ result
end
+
+ def stop_all
+ each_broker do |brk|
+ brk.stop
+ end
+ end
+
+ def restart_all
+ each_broker do |brk|
+ brk.restart
+ end
+ end
+
+ def reset_after_tests
+ each_broker do |brk|
+ brk.adapter.reset_after_tests
+ end
+ end
+
+ def reset
+ each_broker do |brk|
+ begin
+ brk.stop
+ rescue => e
+ Logging.logger.warn Logging.message_with_exception("error stopping broker #{brk.name}", e)
+ end
+ end
+ brokers.clear
+ end
+
+ private
+ def brokers
+ @brokers ||= { }
+ end
+
+ def clients
+ @clients ||= { }
+ end
+
+ def each_broker
+ brokers.keys.each do |k|
+ yield brokers[k]
+ end
+ end
end
- def initialize(options)
+ def initialize(name = DEFAULT_BROKER_NAME, options)
+ @name = name
@adapter = resolve_adapter(options[:adapter], options)
@stopped = false
@configuration = options
@destinations = {}
@consumers = {}
- @logger = options[:logger] || Logger.new(STDOUT).tap{|l| l.level = Logger::INFO}
logger.debug "MessageDriver configured successfully!"
end
+ def logger
+ MessageDriver.logger
+ end
+
+ def client
+ @client ||= self.class.client(name)
+ end
+
def stop
@adapter.stop
@stopped = true
end
@@ -51,15 +112,15 @@
@adapter = resolve_adapter(@configuration[:adapter], @configuration)
@stopped = false
end
def dynamic_destination(dest_name, dest_options={}, message_props={})
- Client.dynamic_destination(dest_name, dest_options, message_props)
+ client.dynamic_destination(dest_name, dest_options, message_props)
end
def destination(key, dest_name, dest_options={}, message_props={})
- dest = Client.dynamic_destination(dest_name, dest_options, message_props)
+ dest = self.dynamic_destination(dest_name, dest_options, message_props)
@destinations[key] = dest
end
def consumer(key, &block)
raise MessageDriver::Error, "you must provide a block" unless block_given?
@@ -85,11 +146,11 @@
when nil
raise "you must specify an adapter"
when Symbol, String
resolve_adapter(find_adapter_class(adapter), options)
when Class
- resolve_adapter(adapter.new(options), options)
+ resolve_adapter(adapter.new(self, options), options)
when MessageDriver::Adapters::Base
adapter
else
raise "adapter must be a MessageDriver::Adapters::Base, but this object is a #{adapter.class}"
end
@@ -99,10 +160,10 @@
require "message_driver/adapters/#{adapter_name}_adapter"
adapter_method = "#{adapter_name}_adapter"
unless respond_to?(adapter_method)
- raise "the adapter #{adapter_name} must provide MessageDriver::Broker.#{adapter_method} that returns the adapter class"
+ raise "the adapter #{adapter_name} must provide MessageDriver::Broker##{adapter_method} that returns the adapter class"
end
send(adapter_method)
end
end