# encoding: utf-8 require "logstash/outputs/base" require "logstash/namespace" require "socket" require "timeout" require "rexml/document" # The OpenNMS output is used to send an event (xml document) to an OpenNMS # server. The event `@timestamp` will automatically be associated with the # OpenNMS item data. # # NOTE: This plugin will log a warning if a necessary field is missing. It # will not attempt to resend if OpenNMS is down, but will log an error message. class LogStash::Outputs::OpenNMS < LogStash::Outputs::Base config_name "opennms" concurrency :single # The IP or resolvable hostname where the OpenNMS server is running config :opennms_server_hosts, :validate => :hash, :default => {"localhost" => 5817} # The number of seconds to wait before giving up on a connection to the OpenNMS # server. This number should be very small, otherwise delays in delivery of # other outputs could result. config :opennms_server_timeout, :validate => :number, :default => 1 # The field name which holds the OpenNMS time zone. This can be a sub-field of # the @metadata field. config :opennms_event_timezone, :validate => :string, :required => true # The field name which holds the OpenNMS ... This can be a sub-field of # the @metadata field. config :opennms_event_service, :validate => :string # The field name which holds the OpenNMS ... This can be a sub-field of # the @metadata field. config :opennms_event_nodeid, :validate => :number # The field name which holds the OpenNMS ... This can be a sub-field of # the @metadata field. config :opennms_event_interface, :validate => :string # The field name which holds the OpenNMS ... This can be a sub-field of # the @metadata field. config :opennms_event_uei, :validate => :string, :default => "uei.opennms.org/elastic/logstash/generic/state/1" # The field name which holds the OpenNMS ... This can be a sub-field of # the @metadata field. config :opennms_event_origin, :validate => :string, :default => "None" # The field name which holds the OpenNMS ... This can be a sub-field of # the @metadata field. config :opennms_event_severity, :validate => :string, :required => true, :default => "Warning" # The field name which holds the OpenNMS ... This can be a sub-field of # the @metadata field. config :opennms_event_source, :validate => :string, :default => "logstash" # The field name which holds the OpenNMS ... This can be a sub-field of # the @metadata field. config :opennms_event_description, :validate => :string, :default => "None" # The field name which holds the OpenNMS ... This can be a sub-field of # the @metadata field. config :opennms_event_logmsg, :validate => :string, :default => "None" # This directive cannot be used in conjunction with the single-value directives # `opennms_key` and `opennms_value`. config :opennms_event_parms, :validate => :hash # The field name which holds the OpenNMS ... This can be a sub-field of # the @metadata field. config :opennms_event_parms_order, :validate => :array public def register if !@opennms_key.nil? && !@multi_value.nil? @logger.warn("Cannot use multi_value in conjunction with opennms_key/opennms_value. Ignoring opennms_key.") end if @opennms_event_parms.nil? @opennms_event_parms = { 'a' => '1', 'b' => '2', 'c' => '3' } end # We're only going to use @multi_value in the end, so let's build it from # @opennms_key and @opennms_value if it is empty (single value configuration). if @multi_value.nil? @multi_value = [ @opennms_key, @opennms_value ] end if @multi_value.length % 2 == 1 raise LogStash::ConfigurationError, I18n.t("logstash.agent.configuration.invalid_plugin_register", :plugin => "output", :type => "opennms", :error => "Invalid opennms configuration #{@multi_value}. multi_value requires an even number of elements as ['opennms_key1', 'opennms_value1', 'opennms_key2', 'opennms_value2']") end end # def register public def field_check(event, fieldname) if !event.get(fieldname) @logger.warn("Field referenced by #{fieldname} is missing") false else true end end # def field_check public def kv_check(event, key_field, value_field) errors = 0 for field in [key_field, value_field] errors += 1 unless field_check(event, field) end errors < 1 ? true : false end # def kv_check public def validate_fields(event) found = [] (0..@multi_value.length-1).step(2) do |idx| @logger.warn("---> " + @multi_value[idx].to_s) if kv_check(event, @multi_value[idx], @multi_value[idx+1]) found << @multi_value[idx] found << @multi_value[idx+1] end end found end # def validate_fields public def validate_props(event,props,found) props.each do |props_key,props_value| if props_value.is_a?(::Hash) found[props_key] = validate_props(event,props_value,{}) elsif props_value.is_a?(::Array) found[props_key]=props_value else if !event.get(props_value) @logger.warn("Field referenced by #{props_value} is missing") found[props_key] = props_value.empty? ? "n/a" : props_value else found[props_key]=event.get(props_value) end end end found end # def validate_props # Build XML event document public def format_request(message) event_doc = "" event_doc += "" event_doc += "" event_doc += "" + message['uei'] + "" event_doc += "" + message['source'] + "" event_doc += "" + message['nodeid'] + "" if !message['nodeid'].empty? event_doc += "" event_doc += "" + message['origin'] + "" event_doc += "" + message['interface'] + "" if !message['interface'].empty? event_doc += "" + message ['service'] + "" if !message['service'].empty? event_doc += "" #message['parms'].each do |parm_name, parm_value| message['parms_order'].each do |parm_name| parm_value = message['parms'][parm_name] next if parm_name == 'severity' event_doc += "" event_doc += "" event_doc += "" event_doc += "" end event_doc += "" #event_doc += "" + message['description'] + "" if !message['description'].empty? event_doc += "" + message['logmsg'] + "" if !message['logmsg'].empty? event_doc += "" + message['severity'] + "" event_doc += "" event_doc += "" event_doc += "" xml_event = REXML::Document.new(event_doc) end # def format_request def tcp_send(message) begin @opennms_server_hosts.each do |server,port| TCPSocket.open(server,port) do |sock| xml_event = format_request(message).to_s logger.debug("XML Event: " + xml_event) sock.print xml_event end end rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH, Errno::ECONNRESET @logger.error("Connection error. Unable to connect to OpenNMS server", :server => @opennms_server_hosts, ) false end end # def tcp_send def send_to_opennms(event) begin Timeout::timeout(@opennms_server_timeout) do tcp_send(event) end rescue Timeout::Error @logger.warn("Connection attempt to OpenNMS server timed out.", :server => @opennms_server_host, :port => @opennms_server_port.to_s, :timeout => @opennms_server_timeout.to_s ) false end end # def send_to_opennms public def receive(event) #parms = validate_props(event,@opennms_event_parms) message_map = { 'host' => @opennms_server_host.to_s, 'port' => @opennms_server_port.to_s, 'timezone' => @opennms_event_timezone.to_s, 'service' => @opennms_event_service.to_s, 'nodeid' => @opennms_event_nodeid.to_s, 'interface' => @opennms_event_interface.to_s, 'uei' => @opennms_event_uei.to_s, 'origin' => @opennms_event_origin.to_s, 'severity' => @opennms_event_severity.to_s, 'source' => @opennms_event_source.to_s, 'description' => @opennms_event_description.to_s, 'logmsg' => @opennms_event_logmsg.to_s, 'parms' => @opennms_event_parms, 'parms_order' => @opennms_event_parms_order } message = validate_props(event,message_map,{}) #message['parms'].each do |k,v| # @logger.debug("Hash: " + k + " => " + v) #end #return unless field_check(event, @opennms_host) send_to_opennms(message) end # def receive end # class LogStash::Outputs::OpenNMS