require 'active_support/core_ext/array/extract_options' require 'socket' require 'msgpack' module Ganymed ## # The Client connects to the {Sampler} and/or {Processor} and emits samples # and/or {Event events} respectively. # class Client # The {Processor} {ProcessorSocket socket}. attr_reader :processor # The {Sampler} {SamplerSocket socket}. attr_reader :sampler # Create a new client instance. # # @param [Hash] opts Client options. # @option opts [Hash] :processor Options passed to the {ProcessorSocket}. # @option opts [Hash] :sampler Options passed to the {SamplerSocket}. def initialize(opts) @processor = ProcessorSocket.new(opts[:processor]) if opts[:processor] @sampler = SamplerSocket.new(opts[:sampler]) if opts[:sampler] end ## # A simple UDPSocket wrapper. # class Socket # Create a new Socket instance. # # @param [Hash] opts Socket options. # @option opts [String] :host Host to connect to. # @option opts [Fixnum] :port Port to connect to. # @option opts [String] :origin Origin of Samples/Events. Defaults to the # fully-qualified hostname. def initialize(opts) @host, @port = opts[:host], opts[:port] @socket = UDPSocket.new @origin = opts[:origin] || ::Socket.gethostbyname(::Socket.gethostname).first end # Send data to the socket. # # @param [String] data The data to send. # @param [Fixnum] flags Socket flags. def send(data, flags=0) @socket.send(data, flags, @host, @port) end end ## # A {Socket} that emits samples to a {Sampler}. # class SamplerSocket < Socket # Emit a new sample. # # @param [String, Symbol] ds Sample data source. # @param [String] ns {Event} namespace. # @param [Fixnum, Float] value Sample value. # @param [Time] now Sample timestamp. def emit(ds, ns, value, now=Time.now.utc) data = [ds.to_s, ns, @origin, now.to_i + (now.usec * 1e-6), value.to_f] send(data.pack("Z*Z*Z*GG")) end end ## # A {Socket} that emits {Event events} to a {Processor}. # class ProcessorSocket < Socket # Emit a new {Event}. # # @param [String] ns {Event} namespace. # @param [Fixnum, Float] value {Event} value. # @param [Hash] opts Options # @option opts [String] cf Consolidation function used in this event. # @option opts [Time] now {Event} timestamp. # @option opts [String] origin {Event} origin. # @option opts [Array] modifiers Event modifiers. def event(ns, value, opts={}) opts = { :cf => nil, :now => Time.now.utc, :origin => @origin, :modifiers => [], }.merge(opts) { :_type => :event, :n => ns, :m => opts[:modifiers], :c => opts[:cf], :o => opts[:origin], :t => opts[:now].to_i, :v => value }.tap do |data| send(data.to_msgpack) end end # Emit a metadata object to the {Processor}. # # A metadata object contains arbitrary data related to the origin. This # can be queried by {Websocket} clients for displaying information about # origins. # # @param [Hash] data Metadata object. # @param [Hash] opts Options # @option opts [String] origin Metadata origin. Defaults to the FQDN. def metadata(data, opts={}) opts = { :origin => @origin, }.merge(opts) { :_type => :metadata, :origin => opts[:origin], :data => data, }.tap do |data| send(data.to_msgpack) end end end end end