require 'logger' require 'eventbus/common_init' require 'eventbus/queue' require 'fileutils' module EventBus class Service attr_accessor :listen_queue attr_accessor :system_process attr_accessor :global_process # Methods expected to be overridden in subclasses def handle_exception(message, exception) # Just a hook that can be overridden in the subclass. Framework base behavior is in handle_exception_base. # It does not need to be overridden, but can provide some custom behaviors. end # process_message is the main business-logic entry point for the subclass. Subclasses _MUST_ override this. def process_message(message) raise "You need to override the 'process_message' method in your subclass!" end def initialize(application_id = EventBus.application_id) @application_id = application_id @listen_queue = self.class.name @system_process = false @global_process = false @connection_driver = ENV["EVENTBUS_CONNECTOR"] || "Stomp" driver_module = "#{@connection_driver}ConnectionDriver" require "eventbus/connectors/#{@connection_driver.downcase}" # Pretty much just swiped from ActiveSupport "constantize".. conn_module = Object.const_defined?(driver_module) ? Object.const_get(driver_module) : Object.const_missing(driver_module) extend conn_module connection_driver_initialize end def start ( opts = {} ) max_workers = opts.delete(:max_workers) || 5 async_service = opts.delete(:async_service) || false opts[:system_queue] = @system_process opts[:global_process] = @global_process queue_name = Queue.calc_name(listen_queue, @application_id, EventBus.PROD_LEVEL, opts) # I tried to have a single connection dole out to a worker pool, but I was having # weird issues which I think were due to race conditions on the broker connections. # Just spin off threads prior to starting connections so every thread has a private # connection. (1..max_workers).each do Thread.new { watch_queue(queue_name) do |message| process_message_wrapper message end } end # If async_service is false, we'll # just return and leave it up to the caller to not # exit too soon. unless async_service loop do sleep 6000 end end end def logger EventBus.logger end def logger=(new_logger) EventBus.logger = new_logger end private def process_message_wrapper(message) begin logger.info "Beginning processing of message in new thread:" logger.info message process_message message rescue handle_exception_base message, $! end end def handle_exception_base(message, exception) logger.error "Caught exception. Exception Text: #{exception}" logger.error "Backtrace:" exception.backtrace.map { | frame | logger.error "- #{frame}"} msg = EventBus::Message.new("EVENTBUS_CORE") msg.load(message) msg.set_error exception self.handle_exception(msg, exception) msg.send end # This shouldn't be used deliberately when it can be helped (i.e. in EventBus code), # but it re-directs puts in libs to logger. def puts(message) logger.info message end end end # module EventBus