require 'rubygems' require 'bunny' require 'yaml' require 'uuid' require 'eventbus/queue' module EventBus class Message attr_reader :data def initialize(application_id) raise "No application ID specified!" if application_id.nil? @data = { :HEADER => { :message_id => UUID.new.generate.to_s, :message_type => self.default_section_name, :status => "BEGIN", :application_id => application_id }, :ERROR_INFO => { :is_error => false, :backtrace => nil, :message => nil }, :PAYLOAD => {} } @connection_driver = ENV["EVENTBUS_CONNECTOR"] || "Bunny" driver_module = "#{@connection_driver}ConnectionDriver" require_relative "connectors/#{@connection_driver.downcase}" # Eh, pretty much just swiped from ActiveSupport "constantize".. conn_module = Object.const_defined?(driver_module) ? Object.const_get(driver_module) : Object.const_missing(driver_module) extend conn_module connection_driver_initialize end def set(key, val, opts = {}) raise "You must specify a key name!" if key.nil? section = opts.delete(:section) || self.default_section_name @data[:PAYLOAD][section] = {} if @data[:PAYLOAD][section].nil? @data[:PAYLOAD][section][key] = val end def get(key, opts = {}) raise "You must specify a key name!" if key.nil? section = opts.delete(:section) || self.default_section_name return @data[:PAYLOAD][section].nil? ? nil : @data[:PAYLOAD][section][key] end # opts: # :queue_name -- the base queue name to receive the message. Defaults to "dispatcher" # :global_queue -- Do not prepend the application ID and do not append the environment # :system_queue -- Do not prepend the application ID to the queue_name # # Normally a queue name is calculated to be: applicationid.queue_name.environment # so that eventbus can handle multiple applications on the same eventbus instance, and # multiple environment-scoped eventbuses on the same broker. By applying :global_queue # or :system_queue, you are breaking that scoping to some degree. Maybe that's what you want, # but be careful or you might get cross-talk between applications or environments! def send(opts = {}) queue_name = opts.delete(:queue_name) if queue_name.nil? queue_name = "dispatcher" opts[:system_queue] = true end opts[:queue_name] = Queue.calc_name(queue_name, application_id, ENV['PROD_LEVEL'], opts) puts "Sending message to: #{opts[:queue_name]}" set_special(:HEADER, :sender, $0) send_raw self.dump, opts end def set_error(exception) @data[:ERROR_INFO][:backtrace] = exception.backtrace @data[:ERROR_INFO][:message] = exception.to_s self.is_error = true self.status = exception.class.name end def dump YAML::dump(@data) end def load(yaml) @data = YAML::load(yaml) end def application_id return get_special(:HEADER, :application_id) end def message_id return get_special(:HEADER, :message_id) end def message_type return get_special(:HEADER, :message_type) end def message_type=(val) set_special(:HEADER, :message_type, val) end def status return get_special(:HEADER, :status) end def status=(val) set_special(:HEADER, :status, val) end def error_message get_special(:ERROR_INFO, :message) end def error_message=(val) set_special(:ERROR_INFO, :message, val) end def is_error get_special(:ERROR_INFO, :is_error) end def is_error=(val) set_special(:ERROR_INFO, :is_error, val) end def backtrace return get_special(:ERROR_INFO, :backtrace) end def backtrace=(val) set_special(:ERROR_INFO, :backtrace, val) end def clear_error_info self.is_error = false self.backtrace = [] self.error_message = nil end def default_section_name return self.class.name.split(/::/).last end private def set_special(block, key, val) @data[block][key] = val end def get_special(block, key) return @data[block][key] end end end # module EventBus