require 'rubygems' require 'json' require 'uuid' require 'eventbus/common_init' require 'eventbus/queue' module EventBus class Message attr_reader :data def initialize(application_id = EventBus.application_id) raise "No application ID specified!" if application_id.nil? @data = { 'HEADER' => { 'message_id' => UUID.new.generate.to_s, 'message_type' => self.default_section_name, 'status' => "BEGIN", 'application_id' => application_id }, 'ERROR_INFO' => { 'is_error' => false, 'backtrace' => nil, 'message' => nil }, 'PAYLOAD' => {} } @connection_driver = ENV["EVENTBUS_CONNECTOR"] || "Stomp" driver_module = "#{@connection_driver}ConnectionDriver" require_relative "connectors/#{@connection_driver.downcase}" # Eh, pretty much just swiped from ActiveSupport "constantize".. conn_module = Object.const_defined?(driver_module) ? Object.const_get(driver_module) : Object.const_missing(driver_module) extend conn_module connection_driver_initialize end def set(key, val, opts = {}) # This is for convenience and to make messages more # human-readable by eliminating the !Binary encoding # in YAML messages. I'm not willing to make this method # more expensive by walking deep structures to do the # same legwork, because this isn't a functionality issue. # The message deserializes just fine without this. # If your YAML is popping up with random !binary segments # in the YAML *and this bothers you*, just do # a force_encoding("UTF-8") on the members of the structure # as needed. if val.is_a?(String) val = val.dup if val.frozen? # Copy to non-frozen string. val.force_encoding("UTF-8") end raise "You must specify a key name!" if key.nil? section = opts.delete(:section) || self.default_section_name @data['PAYLOAD'][section.to_s] = {} if @data['PAYLOAD'][section.to_s].nil? @data['PAYLOAD'][section.to_s][key.to_s] = val end def get(key, opts = {}) raise "You must specify a key name!" if key.nil? section = opts.delete(:section) || self.default_section_name return @data['PAYLOAD'][section.to_s].nil? ? nil : @data['PAYLOAD'][section.to_s][key.to_s] end # opts: # :queue_name -- the base queue name to receive the message. Defaults to "dispatcher" # :global_queue -- Do not prepend the application ID and do not append the environment # :system_queue -- Do not prepend the application ID to the queue_name # # Normally a queue name is calculated to be: applicationid.queue_name.environment # so that eventbus can handle multiple applications on the same eventbus instance, and # multiple environment-scoped eventbuses on the same broker. By applying :global_queue # or :system_queue, you are breaking that scoping to some degree. Maybe that's what you want, # but be careful or you might get cross-talk between applications or environments! def send(opts = {}) queue_name = opts.delete(:queue_name) if queue_name.nil? queue_name = "dispatcher" opts[:system_queue] = true end opts[:queue_name] = Queue.calc_name(queue_name, application_id, ENV['EVENTBUS_PROD_LEVEL'], opts) puts "Sending message to: #{opts[:queue_name]}" set_special(:HEADER, :sender, $0) send_raw self.dump, opts end def set_error(exception) exception.backtrace.map { |frame| frame.force_encoding("UTF-8") } exception.message.force_encoding("UTF-8") @data['ERROR_INFO']['backtrace'] = exception.backtrace @data['ERROR_INFO']['message'] = exception.message self.is_error = true self.status = exception.class.name end def dump JSON.generate(@data, :indent => ' ', :space => ' ', :object_nl => "\n", :array_nl => "\n", :max_nesting => 50) end def load(raw) @data = JSON::parse(raw) end def application_id return get_special(:HEADER, :application_id) end def message_id return get_special(:HEADER, :message_id) end def message_type return get_special(:HEADER, :message_type) end def message_type=(val) set_special(:HEADER, :message_type, val) end def status return get_special(:HEADER, :status) end def status=(val) set_special(:HEADER, :status, val) end def error_message get_special(:ERROR_INFO, :message) end def error_message=(val) set_special(:ERROR_INFO, :message, val) end def is_error get_special(:ERROR_INFO, :is_error) end def is_error=(val) set_special(:ERROR_INFO, :is_error, val) end def backtrace return get_special(:ERROR_INFO, :backtrace) end def backtrace=(val) set_special(:ERROR_INFO, :backtrace, val) end def clear_error_info self.is_error = false self.backtrace = [] self.error_message = nil end def default_section_name return self.class.name.split(/::/).last end private def set_special(block, key, val) if val.is_a?(String) val = val.dup if val.frozen? # Copy to non-frozen string. val.force_encoding("UTF-8") end @data[block.to_s][key.to_s] = val end def get_special(block, key) return @data[block.to_s][key.to_s] end end end # module EventBus