require 'inline' module Ganymed class Sampler ## # A DataSource processes samples from various sources. Samples are stored # to an in-memory buffer using a sliding-window algorithm. Values are kept # for a configurable time and consolidated values can be flushed to the # processor at any time. # # All DataSources interpolate their samples to gauge style values before # flushing to the processor, e.g. a counter (like requests, or bytes sent # over the network) is interpolated to a rate/s gauge (e.g. requests/s or # bytes/s). # class DataSource attr_reader :buffer, :window # Create a new DataSource. # # @param [Fixnum] window Sliding window size. def initialize(window) @window = window @buffer = Hash.new do |buffer, origin| buffer[origin] = Hash.new do |origin, ns| origin[ns] = Hash.new do |ns, timestamp| ns[timestamp] = [] end end end end # Feed data from the source. # # @param [String] ns The namespace for this value. # @param [String] origin The origin of this value. # @param [Fixnum,Float] value The actual value. # @return [void] def feed(ns, origin, value) @buffer[origin][ns][now] << value end # Flush and consolidate the buffer. # # @yield [ns, origin, values] Run the block once for every # namespace/origin, passing in all the # consolidated values. # @return [void] def flush raise NotImplementedError end # Yield values for every origin/ns combination and remove old elements # from the buffer (sliding window). This is used internally to iterate # through the values in the current window. # # @yield [ns, origin, ts] Run the block once for every namespace # passing in all the collected samples grouped # by timestamp. def each(&block) oldest = now - window @buffer.each do |origin, ns| ns.each do |ns, ts| ts.delete_if { |ts, _| ts < oldest } yield ns, origin, ts end end end # Time.now.utc.to_i is way too expensive inline :C do |builder| builder.include "<sys/time.h>" builder.c <<-EOC long now(void) { struct timeval tv; gettimeofday(&tv, NULL); return tv.tv_sec; } EOC end end end end