require 'logger' require 'eventbus/common_init' require 'eventbus/queue' require 'fileutils' module EventBus class Service attr_accessor :listen_queue attr_accessor :system_process attr_accessor :global_process # Methods expected to be overridden in subclasses def handle_exception(message, exception) # Just a hook that can be overridden in the subclass. Framework base behavior is in handle_exception_base. # It does not need to be overridden, but can provide some custom behaviors. end # process_message is the main business-logic entry point for the subclass. Subclasses _MUST_ override this. def process_message(message) raise "You need to override the 'process_message' method in your subclass!" end def initialize(application_id = EventBus.application_id) @application_id = application_id @listen_queue = self.class.name @system_process = false @global_process = false @connection_driver = ENV["EVENTBUS_CONNECTOR"] || "Stomp" driver_module = "#{@connection_driver}ConnectionDriver" require "eventbus/connectors/#{@connection_driver.downcase}" # Pretty much just swiped from ActiveSupport "constantize".. conn_module = Object.const_defined?(driver_module) ? Object.const_get(driver_module) : Object.const_missing(driver_module) extend conn_module connection_driver_initialize end def start ( opts = {} ) max_workers = opts.delete(:max_workers) || 5 opts[:system_queue] = @system_process opts[:global_process] = @global_process queue_name = Queue.calc_name(listen_queue, @application_id, EventBus.PROD_LEVEL, opts) watch_queue(queue_name) do |message| # The +2 stuff is because the main thread counts in the list, even though it's not a "worker" thread. # What makes it worse is that, depending on the broker connector, watch_queue may also start a thread. # With this implementation, that extra thread (e.g. with the stomp connector) would count in this. # Bottom line is allow a little wiggle room with the :max_workers. Don't tighten down the threads too much # for now. while (Thread.list.length >= max_workers + 2) do sleep 0.25 end Thread.new do process_message_wrapper message end end # Just sleep in the main thread. Nothing going on here anymore. loop do sleep 6000 end end def logger EventBus.logger end def logger=(new_logger) EventBus.logger = new_logger end private def process_message_wrapper(message) begin logger.info "Beginning processing of message in new thread:" logger.info message process_message message rescue handle_exception_base message, $! end end def handle_exception_base(message, exception) logger.error "Caught exception. Exception Text: #{exception}" logger.error "Backtrace:" exception.backtrace.map { | frame | logger.error "- #{frame}"} msg = EventBus::Message.new("EVENTBUS_CORE") msg.load(message) msg.set_error exception self.handle_exception(msg, exception) msg.send end # This shouldn't be used deliberately when it can be helped (i.e. in EventBus code), # but it re-directs puts in libs to logger. def puts(message) logger.info message end end end # module EventBus