lib/ustate/server/index.rb in ustate-client-0.0.6 vs lib/ustate/server/index.rb in ustate-client-0.0.7

- old
+ new

@@ -8,19 +8,16 @@ require 'sequel' THREADS = 1000 BUFFER_SIZE = 10 - # Forget about states after - EXPIRY = 1000 # Update metrics every INSERT_RATE_INTERVAL = 5 INSERT_TIMES_INTERVAL = 5 attr_reader :db, :queue - attr_accessor :expiry attr_accessor :insert_rate_interval attr_accessor :insert_times_interval def initialize(opts = {}) @db = Sequel.sqlite @@ -31,12 +28,10 @@ @on_state_change = [] @on_state_once = [] @on_state = [] - @expiry = opts[:expiry] || EXPIRY - @insert_rate_interval = opts[:insert_rate_interval] || INSERT_RATE_INTERVAL @insert_times_interval = opts[:insert_times_interval] || INSERT_TIMES_INTERVAL setup_db end @@ -50,10 +45,19 @@ dt = Time.now - t0 @insert_times << dt @insert_rate << 1 end + # Removes a state from the index. + # + # Right now state is anything which responds to #host and #service. + # I'll probably evolve the index to support arbitrary operations on all + # states matching a query, but haven't thought out the API. + def delete(state) + @db[:states].filter(host: state.host, service: state.service).delete + end + def thread(s) Thread.new do process s @pooltex.synchronize do @pool.delete Thread.current @@ -144,40 +148,10 @@ ds.all.map do |row| row_to_state row end end - # Remove states older than @expiry - def reap - @db[:states].filter { |s| - s.time < (Time.now - @expiry).to_i - }.each do |row| - on_state_change( - row_to_state(row), - row_to_state( - row.merge( - state: 'unknown', - description: "ustate has not heard from this service since #{Time.at(row[:time])}", - metric_f: nil, - time: Time.now.to_i - ) - ) - ) - @db[:states].filter(host: row[:host], state: row[:state]).delete - end - end - - # Periodically expire old states. - def reaper - Thread.new do - loop do - sleep 1 - reap - end - end - end - # Converts a row to a State def row_to_state(row) State.new(row) end @@ -195,10 +169,9 @@ end def start stop! @pool = [] - @reaper = reaper @insert_rate = MetricThread.new(Mtrc::Rate) do |r| self << State.new( service: "ustate insert rate", state: "ok",