module Ganymed
  class Sampler

    ##
    # A DataSource processes samples from various sources.  Samples are stored
    # to an in-memory buffer and flushed to the {Processor} once for every
    # tick.
    #
    # All DataSources interpolate their samples to gauge style values before
    # flushing to the processor, e.g. a counter (like requests, or bytes sent
    # over the network) is interpolated to a rate/s gauge (e.g. requests/s or
    # bytes/s).
    #
    class DataSource

      attr_reader :ticks, :buffer

      # Create a new DataSource with buffers for every +tick+ in +ticks+.
      #
      # @param [Array] ticks  All ticks that should be stored in the buffer.
      #
      def initialize(ticks)
        @ticks = ticks
        @mutex = Mutex.new
        @buffer = Hash.new do |buffer, tick|
          buffer[tick] = Hash.new do |tick, ns|
            tick[ns] = Hash.new do |ns, origin|
              ns[origin] = []
            end
          end
        end
      end

      # @group Methods for subclasses

      # Feed data from the source.
      #
      # @param [String] ns  The namespace for this value.
      # @param [String] origin The origin of this value.
      # @param [Time] ts  The timestamp of this value.
      # @param [Fixnum,Float] value  The actual value.
      # @return [void]
      def feed(ns, origin, ts, value)
        raise NotImplementedError
      end

      # Flush and consolidate the buffer for the given +tick+.
      #
      # @param [Fixnum] tick  The tick to flush.
      # @yield [ns, values]  Run the block once for every namespace, passing in
      #                      all the consolidated values.
      # @return [void]
      def flush(tick, &block)
        raise NotImplementedError
      end

      # @endgroup

      # Add a value to all tick buffers. Usually called from {#feed}.
      #
      # @param [String] ns  The namespace for this value.
      # @param [String] origin The origin of this value.
      # @param [Object] value  Datasource specific value object.
      # @return [void]
      def add(ns, origin, value)
        @ticks.each do |tick|
          @buffer[tick][ns][origin] << value
        end
      end

      # Atomically create a new buffer for +tick+ and iterate over the old one.
      # Typically used by {#flush} to consolidate the samples.
      #
      # @param [Fixnum] tick  The tick buffer to flush.
      # @yield [ns, origin, values]  Run the block once for every namespace
      #                              and passing in all the consolidated
      #                              values.
      def each(tick, &block)
        {}.tap do |result|
          @mutex.synchronize do
            result.replace(@buffer[tick].dup)
            @buffer[tick].clear
          end
        end.each do |ns, origins|
          origins.each do |origin, values|
            yield ns, origin, values
          end
        end
      end
    end
  end
end