module Ganymed class Sampler ## # A DataSource processes samples from various sources. Samples are stored # to an in-memory buffer and flushed to the {Processor} once for every # tick. # # All DataSources interpolate their samples to gauge style values before # flushing to the processor, e.g. a counter (like requests, or bytes sent # over the network) is interpolated to a rate/s gauge (e.g. requests/s or # bytes/s). # class DataSource attr_reader :ticks, :buffer # Create a new DataSource with buffers for every +tick+ in +ticks+. # # @param [Array] ticks All ticks that should be stored in the buffer. # def initialize(ticks) @ticks = ticks @mutex = Mutex.new @buffer = Hash.new do |buffer, tick| buffer[tick] = Hash.new do |tick, ns| tick[ns] = Hash.new do |ns, origin| ns[origin] = [] end end end end # @group Methods for subclasses # Feed data from the source. # # @param [String] ns The namespace for this value. # @param [String] origin The origin of this value. # @param [Time] ts The timestamp of this value. # @param [Fixnum,Float] value The actual value. # @return [void] def feed(ns, origin, ts, value) raise NotImplementedError end # Flush and consolidate the buffer for the given +tick+. # # @param [Fixnum] tick The tick to flush. # @yield [ns, values] Run the block once for every namespace, passing in # all the consolidated values. # @return [void] def flush(tick, &block) raise NotImplementedError end # @endgroup # Add a value to all tick buffers. Usually called from {#feed}. # # @param [String] ns The namespace for this value. # @param [String] origin The origin of this value. # @param [Object] value Datasource specific value object. # @return [void] def add(ns, origin, value) @ticks.each do |tick| @buffer[tick][ns][origin] << value end end # Atomically create a new buffer for +tick+ and iterate over the old one. # Typically used by {#flush} to consolidate the samples. # # @param [Fixnum] tick The tick buffer to flush. # @yield [ns, origin, values] Run the block once for every namespace # and passing in all the consolidated # values. def each(tick, &block) {}.tap do |result| @mutex.synchronize do result.replace(@buffer[tick].dup) @buffer[tick].clear end end.each do |ns, origins| origins.each do |origin, values| yield ns, origin, values end end end end end end