require 'logger' require 'eventbus/queue' module EventBus class Service attr_accessor :listen_queue attr_accessor :system_process attr_accessor :global_process attr_accessor :logger # Methods expected to be overridden in subclasses def handle_exception(message, exception) # Just a hook that can be overridden in the subclass. Framework base behavior is in handle_exception_base. # It does not need to be overridden, but can provide some custom behaviors. end # process_message is the main business-logic entry point for the subclass. Subclasses _MUST_ override this. def process_message(message) raise "You need to override the 'process_message' method in your subclass!" end def initialize(application_id = nil) @listen_queue = self.class.name @application_id = application_id || raise("No application ID specified!") @logger = Logger.new(STDOUT) @system_process = false @global_process = false @connection_driver = ENV["EVENTBUS_CONNECTOR"] || "Stomp" driver_module = "#{@connection_driver}ConnectionDriver" require "eventbus/connectors/#{@connection_driver.downcase}" # Pretty much just swiped from ActiveSupport "constantize".. conn_module = Object.const_defined?(driver_module) ? Object.const_get(driver_module) : Object.const_missing(driver_module) extend conn_module connection_driver_initialize end def start ( opts = {} ) max_workers = opts.delete(:max_workers) || 5 opts[:system_queue] = @system_process opts[:global_process] = @global_process queue_name = Queue.calc_name(listen_queue, @application_id, ENV['PROD_LEVEL'], opts) watch_queue(queue_name) do |message| # The +2 stuff is because the main thread counts in the list, even though it's not a "worker" thread. # What makes it worse is that, depending on the broker connector, watch_queue may also start a thread. # With this implementation, that extra thread (e.g. with the stomp connector) would count in this. # Bottom line is allow a little wiggle room with the :max_workers. Don't tighten down the threads too much # for now. while (Thread.list.length >= max_workers + 2) do sleep 0.25 end Thread.new do process_message_wrapper message end end # Just sleep in the main thread. Nothing going on here anymore. loop do sleep 6000 end end private def process_message_wrapper(message) begin puts "Beginning processing of message in new thread:" puts message process_message message rescue handle_exception_base message, $! end end def handle_exception_base(message, exception) puts "Caught exception. Exception Text: #{exception}" puts "Backtrace:" exception.backtrace.map { | frame | puts "- #{frame}"} msg = EventBus::Message.new("CORE") msg.load(message) msg.set_error exception self.handle_exception(msg, exception) msg.send end def puts(message) @logger.info message end end end # module EventBus