lib/message_driver/broker.rb in message-driver-0.2.2 vs lib/message_driver/broker.rb in message-driver-0.3.0

- old
+ new

@@ -1,42 +1,103 @@ -require 'forwardable' -require 'logger' - module MessageDriver class Broker - extend Forwardable + include Logging - attr_reader :adapter, :configuration, :destinations, :consumers, :logger + DEFAULT_BROKER_NAME = :default + attr_reader :adapter, :configuration, :destinations, :consumers, :name + class << self - def configure(options) - @instance = new(options) + def configure(name = DEFAULT_BROKER_NAME, options) + if brokers.keys.include? name + raise BrokerAlreadyConfigured, "there is already a broker named #{name} configured" + end + brokers[name] = new(name, options) end - def method_missing(m, *args, &block) - @instance.send(m, *args, &block) + def define(name = DEFAULT_BROKER_NAME) + yield broker(name) end - def instance - @instance + def broker(name = DEFAULT_BROKER_NAME) + result = brokers[name] + if result.nil? + raise BrokerNotConfigured, "There is no broker named #{name} configured. The configured brokers are #{brokers.keys}" + end + result end - def define - yield @instance + def client(name) + unless result = clients[name] + result = clients[name] = Client.for_broker(name) + end + result end + + def stop_all + each_broker do |brk| + brk.stop + end + end + + def restart_all + each_broker do |brk| + brk.restart + end + end + + def reset_after_tests + each_broker do |brk| + brk.adapter.reset_after_tests + end + end + + def reset + each_broker do |brk| + begin + brk.stop + rescue => e + Logging.logger.warn Logging.message_with_exception("error stopping broker #{brk.name}", e) + end + end + brokers.clear + end + + private + def brokers + @brokers ||= { } + end + + def clients + @clients ||= { } + end + + def each_broker + brokers.keys.each do |k| + yield brokers[k] + end + end end - def initialize(options) + def initialize(name = DEFAULT_BROKER_NAME, options) + @name = name @adapter = resolve_adapter(options[:adapter], options) @stopped = false @configuration = options @destinations = {} @consumers = {} - @logger = options[:logger] || Logger.new(STDOUT).tap{|l| l.level = Logger::INFO} logger.debug "MessageDriver configured successfully!" end + def logger + MessageDriver.logger + end + + def client + @client ||= self.class.client(name) + end + def stop @adapter.stop @stopped = true end @@ -51,15 +112,15 @@ @adapter = resolve_adapter(@configuration[:adapter], @configuration) @stopped = false end def dynamic_destination(dest_name, dest_options={}, message_props={}) - Client.dynamic_destination(dest_name, dest_options, message_props) + client.dynamic_destination(dest_name, dest_options, message_props) end def destination(key, dest_name, dest_options={}, message_props={}) - dest = Client.dynamic_destination(dest_name, dest_options, message_props) + dest = self.dynamic_destination(dest_name, dest_options, message_props) @destinations[key] = dest end def consumer(key, &block) raise MessageDriver::Error, "you must provide a block" unless block_given? @@ -85,11 +146,11 @@ when nil raise "you must specify an adapter" when Symbol, String resolve_adapter(find_adapter_class(adapter), options) when Class - resolve_adapter(adapter.new(options), options) + resolve_adapter(adapter.new(self, options), options) when MessageDriver::Adapters::Base adapter else raise "adapter must be a MessageDriver::Adapters::Base, but this object is a #{adapter.class}" end @@ -99,10 +160,10 @@ require "message_driver/adapters/#{adapter_name}_adapter" adapter_method = "#{adapter_name}_adapter" unless respond_to?(adapter_method) - raise "the adapter #{adapter_name} must provide MessageDriver::Broker.#{adapter_method} that returns the adapter class" + raise "the adapter #{adapter_name} must provide MessageDriver::Broker##{adapter_method} that returns the adapter class" end send(adapter_method) end end