Sha256: 676adfb93a475313f0e64f29b60182da07939af454295e5655a46fe3addc20bb

Contents?: true

Size: 1.8 KB

Versions: 3

Compression:

Stored size: 1.8 KB

Contents

module UState
  class Aggregator
    # Combines states periodically.

    INTERVAL = 1

    attr_accessor :interval
    attr_accessor :folds
    def initialize(index, opts = {})
      @index = index

      @folds = {}
      @interval = opts[:interval] || INTERVAL
      @server = opts[:server]

      start
    end

    # Combines states matching query with State.average
    def average(query, *a)
      fold query do |states|
        State.average states, *a
      end
    end

    # Combines states matching query with the given block. The block
    # receives an array of states which presently match.
    # 
    # Example:
    #   fold 'service = "api % reqs/sec"' do |states|
    #     states.inject(State.new(service: 'api reqs/sec')) do |combined, state|
    #       combined.metric_f += state.metric_f
    #       combined
    #     end
    #   end
    def fold(query, &block)
      @folds[block] = if existing = @folds[block]
        "(#{existing}) or (#{q})"
      else
        query
      end
    end

    # Polls index for states matching each fold, applies fold, and inserts into
    # index.
    def start
      @runner = Thread.new do
        loop do
          begin
            interval = (@interval.to_f / @folds.size) rescue @interval
            @folds.each do |f, query|
              matching = @index.query(Query.new(string: query))
              unless matching.empty?
                if combined = f[matching]
                  @index << combined
                end
              end
              sleep interval
            end
          rescue Exception => e
            @server.log.error e
            sleep 1
          end
        end
      end
    end

    # Combines states matching query with State.sum
    def sum(query, *a)
      fold query do |states|
        State.sum states, *a
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
ustate-client-0.0.7 lib/ustate/aggregator.rb
ustate-client-0.0.6 lib/ustate/aggregator.rb
ustate-client-0.0.5 lib/ustate/aggregator.rb