#!/usr/bin/env ruby admin_base = ENV['EVENTBUS_PORTAL_BASE'] || './admin' puts "Loading environment.rb from: #{admin_base}/config/" require 'eventbus/service' require 'eventbus/message' require "#{admin_base}/config/environment" class DispatcherService < EventBus::Service class ConfigurationError < RuntimeError end def initialize super("EVENT_BUS_CORE") self.listen_queue = "dispatcher" self.system_process = true end def process_message(incoming_msg) msg = EventBus::Message.new("EVENT_BUS_CORE") msg.load(incoming_msg) puts "Application Key: #{msg.application_id}" puts "Message ID: #{msg.message_id}" puts "Message type: #{msg.message_type}" puts "Message status: #{msg.status}" puts "Finding application for key #{msg.application_id}" app = Application.find_by_key(msg.application_id) if app.nil? puts "Unable to find an application with key #{msg.application_id}" msg.status = "DispatcherConfigurationError" msg.is_error = true msg.error_message = "Dispatcher has no application defined with key #{msg.application_id}" log_message nil, msg return end puts "Found application: #{app.inspect}" msg_type = app.message_types.find_by_name(msg.message_type) if msg_type.nil? log_message app, msg puts "No configuration found for Application Key #{app.key}, MessageType #{msg.message_type}" msg.status = "DispatcherConfigurationError" msg.is_error = true msg.error_message = "Application #{app.name} (#{app.key}) has no message type defined as: (#{msg.message_type})" log_message app, msg return else count = 0 msg_type.message_routes.where(:incoming_status => msg.status).each do | route | puts "Found route: #{route.dispatch_to}" msg.send(:queue_name => route.dispatch_to) count = count + 1 log_message app, msg, msg_type, route end if msg.is_error msg_type.message_routes.where(:match_errors => true).each do | route | puts "Matched error route: #{route.dispatch_to}" msg.send(:queue_name => route.dispatch_to) count = count + 1 log_message app, msg, msg_type, route end end if count == 0 puts "No routes found for MessageType #{msg_type.name} Status #{msg.status}. Will log only." log_message app, msg, msg_type end end end def log_message(application = nil, msg = nil, msg_type = nil, route = nil) log = Message.new unless application.nil? log.application_id = application.id end unless msg.nil? log.application_key = msg.application_id log.message_type_name = msg.message_type log.status = msg.status log.content = msg.dump log.is_error = msg.is_error log.message_guid = msg.message_id end unless msg_type.nil? log.message_type_id = msg_type.id end unless route.nil? log.dispatch_to = route.dispatch_to end log.save end end DispatcherService.new.start(:max_workers => 5)