Sha256: a29967d2a1e033085e3f8a084a2817266f63e898d359121daa06c3189cb2dac8

Contents?: true

Size: 1.14 KB

Versions: 2

Compression:

Stored size: 1.14 KB

Contents

module QueueLatencyTracker
  GRANULARITY = 1_000_000

  class Foreman
    def process(work_order, _channel)
      work_order['headers'].merge!("queued_at" => (Time.now.to_f * GRANULARITY).to_i)
      work_order
    end
  end

  class Tradesman
    def process(work_order, channel)
      send_latency(work_order["headers"]["queued_at"], channel)
      work_order
    end

    def send_latency(queued_at, channel)
      latency_ns = (Time.now.to_f * GRANULARITY).to_i - queued_at
      logstash_send(latency_json(latency_ns / 1000.0, channel))
    end

    def logstash_send(json)
      UDPSocket.new.send(json, 0, config[:logstash][:server_ip], config[:logstash][:port])
    end

    def latency_json(latency_ms, channel)
      %({"server_name":"#{config[:server_name]}","queue_latency (ms)":#{latency_ms},"channel":"#{channel}"})
    end

    def config
      QueueLatencyTracker.config
    end
  end

  class << self
    attr_reader :config
    def configure(config)
      @config = {
        logstash: {
          server_ip: config[:logstash_server_ip],
          port: config[:logstash_port] },
        server_name: config[:server_name]
      }
    end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
nexia_worker_roulette-0.2.1 lib/worker_roulette/queue_latency_tracker.rb
nexia_worker_roulette-0.2.0 lib/worker_roulette/queue_latency_tracker.rb