# encoding: utf-8 require "logstash/outputs/base" require "logstash/namespace" require "net/http" require "uri" require "json" require "concurrent" class LogStash::Outputs::SplunkHec < LogStash::Outputs::Base config_name "splunk_hec" concurrency :shared config :hec_token, :validate => :string, :required => true config :hec_host, :validate => :string, :required => true config :host, :validate => :string, :default => "none" config :source, :validate => :string, :default => "none" config :sourcetype, :validate => :string, :default => "none" config :port, :validate => :number, :default => 443 config :index, :validate => :string, :default => "main" config :batch_size, :validate => :number, :default => 100 config :flush_interval, :validate => :number, :default => 5 config :retry_count, :validate => :number, :default => 3 public def register @http = Net::HTTP.new(@hec_host, @port) @http.use_ssl = true @uri = URI.parse("https://#{@hec_host}:#{@port}/services/collector/event") @event_batch = Concurrent::Array.new @last_flush = Concurrent::AtomicReference.new(Time.now) end public def receive(event) format_and_add_to_batch(event) if batch_full? || time_to_flush? flush_batch end end public def close flush_batch if @event_batch.any? end private def format_and_add_to_batch(event) event_data = event.to_hash event_data.delete("@version") hec_event = { "time" => event.get("@timestamp").to_i, "host" => @host != "none" ? @host : event.get("host")&.fetch("name") { Socket.gethostname } || "default_host", "source" => @source != "none" ? @source : event.get("source") { "logstash" }, "sourcetype" => @sourcetype != "none" ? @sourcetype : "_json", "index" => @index, "event" => event_data } @event_batch << hec_event end private def batch_full? @event_batch.size >= @batch_size end private def time_to_flush? Time.now - @last_flush.get >= @flush_interval end private def flush_batch return if @event_batch.empty? batch_to_send = @event_batch.slice!(0, @batch_size) request = Net::HTTP::Post.new(@uri.request_uri) request["Authorization"] = "Splunk #{@hec_token}" request["Content-Type"] = "application/json" request.body = batch_to_send.map(&:to_json).join("\n") @logger.warn("Request body to be sent to Splunk: #{request.body}") @retry_count.times do |attempt| begin response = @http.request(request) if response.code == "200" @logger.debug("Successfully sent batch to Splunk", :batch_size => batch_to_send.size) @last_flush.set(Time.now) return else @logger.warn("Failed to send batch to Splunk, will retry", :response_code => response.code, :response_body => response.body, :attempt => attempt + 1, :batch_size => batch_to_send.size) end rescue StandardError => e @logger.error("Error sending batch to Splunk, will retry", :error => e.message, :attempt => attempt + 1, :batch_size => batch_to_send.size) end sleep(1) end @logger.error("Failed to send batch to Splunk after #{@retry_count} attempts", :batch_size => batch_to_send.size) @event_batch.concat(batch_to_send) end end