#!/usr/bin/env ruby # frozen_string_literal: true require "bundler/setup" require "benchmark/ips" require "tmpdir" require "socket" require "statsd-instrument" require "datadog/statsd" require "forwardable" require "vernier" class DatadogShim extend Forwardable def_delegator :@client, :close # This is a shim to make the Datadog client compatible with the StatsD client # interface. It's not a complete implementation, but it's enough to run the # benchmarks. # @param [Datadog::Statsd] client def initialize(client) @client = client end class NullSink def flush(blocking: false) end end def sink @sink ||= NullSink.new end def increment(stat, value = 1, tags: nil) @client.increment(stat, value: value, tags: tags) end def measure(stat, value = nil, tags: nil, &block) @client.time(stat, value: value, tags: tags, &block) end def histogram(stat, value = nil, tags: nil, &block) @client.histogram(stat, value: value, tags: tags, &block) end def gauge(stat, value, tags: nil) @client.gauge(stat, value: value, tags: tags) end def set(stat, value, tags: nil) @client.set(stat, value: value, tags: tags) end def event(title, text, tags: nil) @client.event(title, text, tags: tags) end def service_check(name, status, tags: nil) @client.service_check(name, status, tags: tags) end end def send_metrics(client) client.increment("StatsD.increment", 10) client.measure("StatsD.measure") { 1 + 1 } client.gauge("StatsD.gauge", 12.0, tags: ["foo:bar", "quc"]) end def send_metrics_high_cardinality(client) SERIES_COUNT.times do |i| tags = ["series:#{i}", "foo:bar", "baz:quc"] client.increment("StatsD.increment", 10, tags: tags) client.measure("StatsD.measure", tags: tags) { 1 + 1 } client.gauge("StatsD.gauge", 12.0, tags: tags) end end SOCKET_PATH = File.join(Dir.pwd, "tmp/metric.sock") THREAD_COUNT = Integer(ENV.fetch("THREAD_COUNT", 5)) EVENTS_PER_ITERATION = 3 ITERATIONS = Integer(ENV.fetch("ITERATIONS", 10_000)) SERIES_COUNT = Integer(ENV.fetch("SERIES_COUNT", 0)) ENABLE_PROFILING = ENV.key?("ENABLE_PROFILING") UDS_MAX_SEND_SIZE = 32_768 LOG_DIR = File.join(Dir.tmpdir, "statsd-instrument-benchmarks") FileUtils.mkdir_p(LOG_DIR) puts "Logs are stored in #{LOG_DIR}" def benchmark_implementation(name, env = {}, datadog_client = false) intermediate_results_filename = "#{Dir.tmpdir}/statsd-instrument-benchmarks/" log_filename = File.join(LOG_DIR, "#{File.basename($PROGRAM_NAME)}-#{name}.log".tr(" ", "_")) FileUtils.mkdir_p(File.dirname(intermediate_results_filename)) FileUtils.mkdir_p(File.dirname(log_filename)) # Set up an UDP listener to which we can send StatsD packets receiver = UDPSocket.new receiver.bind("localhost", 0) FileUtils.mkdir_p(File.dirname(SOCKET_PATH)) FileUtils.rm_f(SOCKET_PATH) receiver_uds = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM) receiver_uds.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true) receiver_uds.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, UDS_MAX_SEND_SIZE * THREAD_COUNT) receiver_uds.bind(Socket.pack_sockaddr_un(SOCKET_PATH)) # with UDS we have to take data out of the socket, otherwise it will fill up # and we will block writing to it (which is what we are testing) consume = Thread.new do loop do receiver_uds.recv(32768) rescue # Ignored end end log_file = File.open(log_filename, "w+", level: Logger::WARN) StatsD.logger = Logger.new(log_file) udp_client = StatsD::Instrument::Environment.new(ENV.to_h.merge( "STATSD_ADDR" => "#{receiver.addr[2]}:#{receiver.addr[1]}", "STATSD_IMPLEMENTATION" => "dogstatsd", "STATSD_ENV" => "production", ).merge(env)).client if datadog_client statsd = Datadog::Statsd.new(receiver.addr[2], receiver.addr[1], **env) udp_client = DatadogShim.new(statsd) end series = SERIES_COUNT.zero? ? 1 : SERIES_COUNT events_sent = THREAD_COUNT * EVENTS_PER_ITERATION * ITERATIONS * series puts "===== #{name} throughput (#{THREAD_COUNT} threads) - total events: #{events_sent} =====" start = Process.clock_gettime(Process::CLOCK_MONOTONIC) threads = THREAD_COUNT.times.map do Thread.new do count = ITERATIONS while (count -= 1) > 0 if SERIES_COUNT.zero? send_metrics(udp_client) else send_metrics_high_cardinality(udp_client) end end end end threads.each(&:join) udp_client.shutdown if udp_client.respond_to?(:shutdown) if datadog_client udp_client.close end duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start consume.kill receiver.close receiver_uds.close series = SERIES_COUNT.zero? ? 1 : SERIES_COUNT events_sent = THREAD_COUNT * EVENTS_PER_ITERATION * ITERATIONS * series puts "events: #{(events_sent / duration).round(1).to_s.reverse.gsub(/(\d{3})(?=\d)/, '\\1,').reverse}/s" end if ENABLE_PROFILING Vernier.start_profile(out: "tmp/benchmark_profile_udp_sync.json") end benchmark_implementation("UDP sync", "STATSD_BUFFER_CAPACITY" => "0") if ENABLE_PROFILING Vernier.stop_profile end if ENABLE_PROFILING Vernier.start_profile(out: "tmp/benchmark_profile_udp_async.json") end benchmark_implementation("UDP batched") if ENABLE_PROFILING Vernier.stop_profile end if ENABLE_PROFILING Vernier.start_profile(out: "tmp/benchmark_profile_uds_small_packet.json") end benchmark_implementation("UDS batched with small packet", "STATSD_SOCKET_PATH" => SOCKET_PATH) if ENABLE_PROFILING Vernier.stop_profile end if ENABLE_PROFILING Vernier.start_profile(out: "tmp/benchmark_profile_uds_batched_async.json") end benchmark_implementation( "UDS batched with jumbo packet", "STATSD_SOCKET_PATH" => SOCKET_PATH, "STATSD_MAX_PACKET_SIZE" => UDS_MAX_SEND_SIZE.to_s, ) if ENABLE_PROFILING Vernier.stop_profile end if ENABLE_PROFILING Vernier.start_profile(out: "tmp/benchmark_udp_batched_with_aggregation.json") end benchmark_implementation( "UDP batched with aggregation and 5 second interval", "STATSD_ENABLE_AGGREGATION" => "true", "STATSD_AGGREGATION_FLUSH_INTERVAL" => "5", ) if ENABLE_PROFILING Vernier.stop_profile end if ENABLE_PROFILING Vernier.start_profile(out: "tmp/benchmark_uds_with_aggregation.json") end benchmark_implementation( "UDS batched with aggregation and 5 second interval", "STATSD_ENABLE_AGGREGATION" => "true", "STATSD_AGGREGATION_FLUSH_INTERVAL" => "5", "STATSD_SOCKET_PATH" => SOCKET_PATH, "STATSD_MAX_PACKET_SIZE" => UDS_MAX_SEND_SIZE.to_s, ) if ENABLE_PROFILING Vernier.stop_profile end