Sha256: bc4eec64ae47db42a46894606bceb45e3283ed57de3ca28eb80d584884b13711

Contents?: true

Size: 1.78 KB

Versions: 8

Compression:

Stored size: 1.78 KB

Contents

require 'socket'
require 'eventmachine'
require 'logger'
require 'snappy'
require 'msgpack'

require 'metriksd/registry'

module Metriksd
  class UdpServer
    class Handler < EventMachine::Connection
      def initialize(proc)
        @proc = proc
        super
      end

      def receive_data(data)
        @proc.call(data)
      end
    end

    attr_reader :logger, :port, :host, :registry

    def initialize(registry, options = {})
      missing_keys = %w(port) - options.keys.map(&:to_s)
      unless missing_keys.empty?
        raise ArgumentError, "Missing required options: #{missing_keys * ', '}"
      end

      @registry = registry
      @port     = options[:port]
      @host     = options[:host]    || '0.0.0.0'
      @logger   = options[:logger]  || Logger.new(STDERR)
      @recvbuf  = options[:recvbuf] || 1024 * 1024

      @unpacker = MessagePack::Unpacker.new
    end
    
    def start
      unless EventMachine.reactor_running?
        Thread.new do
          EventMachine.epoll  = true if EventMachine.epoll?
          EventMachine.kqueue = true if EventMachine.kqueue?
          EventMachine.run
        end
      end

      EventMachine.next_tick do
        handler = proc do |data|
          begin
            unmarshal(data)
          rescue => e
            logger.error "Error in metriks server: #{e.class}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
          end
        end

        EventMachine.open_datagram_socket(@host, @port, Handler, handler)
      end
    end
    
    def stop
      EventMachine.stop
    end

    def join
      if EventMachine.reactor_thread?
        EventMachine.reactor_thread.join
      end
    end
    
    def unmarshal(data)
      @unpacker.feed_each(Snappy.inflate(data)) do |payload|
        @registry << Data.new(payload)
      end
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
metriksd-0.5.7 lib/metriksd/udp_server.rb
metriksd-0.5.6 lib/metriksd/udp_server.rb
metriksd-0.5.5 lib/metriksd/udp_server.rb
metriksd-0.5.4 lib/metriksd/udp_server.rb
metriksd-0.5.3 lib/metriksd/udp_server.rb
metriksd-0.5.2 lib/metriksd/udp_server.rb
metriksd-0.5.1 lib/metriksd/udp_server.rb
metriksd-0.5.0 lib/metriksd/udp_server.rb