lib/ustate/aggregator.rb in ustate-client-0.0.4 vs lib/ustate/aggregator.rb in ustate-client-0.0.5
- old
+ new
@@ -9,18 +9,19 @@
def initialize(index, opts = {})
@index = index
@folds = {}
@interval = opts[:interval] || INTERVAL
+ @server = opts[:server]
start
end
# Combines states matching query with State.average
- def average(query, init = State.new)
+ def average(query, *a)
fold query do |states|
- State.average states, init
+ State.average states, *a
end
end
# Combines states matching query with the given block. The block
# receives an array of states which presently match.
@@ -43,27 +44,32 @@
# Polls index for states matching each fold, applies fold, and inserts into
# index.
def start
@runner = Thread.new do
loop do
- interval = (@interval.to_f / @folds.size) rescue @interval
- @folds.each do |f, query|
- matching = @index.query(Query.new(string: query))
- unless matching.empty?
- if combined = f[matching]
- @index << combined
+ begin
+ interval = (@interval.to_f / @folds.size) rescue @interval
+ @folds.each do |f, query|
+ matching = @index.query(Query.new(string: query))
+ unless matching.empty?
+ if combined = f[matching]
+ @index << combined
+ end
end
+ sleep interval
end
- sleep interval
+ rescue Exception => e
+ @server.log.error e
+ sleep 1
end
end
end
end
# Combines states matching query with State.sum
- def sum(query, init = State.new)
+ def sum(query, *a)
fold query do |states|
- State.sum states, init
+ State.sum states, *a
end
end
end
end