Sha256: 5e95e8279118f69e2e6389785955aeae107f8920c090480213f548a0e33ab206

Contents?: true

Size: 1.96 KB

Versions: 5

Compression:

Stored size: 1.96 KB

Contents

# frozen_string_literal: true

require 'singleton'

require 'riemann/client'

module Riemann
  module Tools
    class RiemannClientWrapper
      attr_reader :options

      BACKOFF_TMIN = 0.5  # Minimum delay between reconnection attempts
      BACKOFF_TMAX = 30.0 # Maximum delay
      BACKOFF_FACTOR = 2

      def initialize(options)
        @options = options

        @queue = Queue.new
        @max_bulk_size = 1000
        @draining = false

        @worker = Thread.new do
          Thread.current.abort_on_exception = true
          backoff_delay = BACKOFF_TMIN

          loop do
            events = []

            events << @queue.pop
            events << @queue.pop while !@queue.empty? && events.size < @max_bulk_size

            client.bulk_send(events)
            backoff_delay = BACKOFF_TMIN
          rescue StandardError => e
            sleep(backoff_delay)

            dropped_count = events.size + @queue.size
            @queue.clear
            warn "Dropped #{dropped_count} event#{'s' if dropped_count > 1} due to #{e}"

            backoff_delay *= BACKOFF_FACTOR
            backoff_delay = BACKOFF_TMAX if backoff_delay > BACKOFF_TMAX
          end
        end

        at_exit { drain }
      end

      def client
        @client ||= begin
          r = Riemann::Client.new(
            host: options[:host],
            port: options[:port],
            timeout: options[:timeout],
            ssl: options[:tls],
            key_file: options[:tls_key],
            cert_file: options[:tls_cert],
            ca_file: options[:tls_ca_cert],
            ssl_verify: options[:tls_verify],
          )

          if options[:tcp] || options[:tls]
            r.tcp
          else
            r
          end
        end
      end

      def <<(event)
        raise('Cannot queue events when draining') if @draining

        @queue << event
      end

      def drain
        @draining = true
        sleep(1) until @queue.empty? || @worker.stop?
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
riemann-tools-1.12.0 lib/riemann/tools/riemann_client_wrapper.rb
riemann-tools-1.11.0 lib/riemann/tools/riemann_client_wrapper.rb
riemann-tools-1.10.0 lib/riemann/tools/riemann_client_wrapper.rb
riemann-tools-1.9.1 lib/riemann/tools/riemann_client_wrapper.rb
riemann-tools-1.9.0 lib/riemann/tools/riemann_client_wrapper.rb