lib/ustate/server/index.rb in ustate-client-0.0.6 vs lib/ustate/server/index.rb in ustate-client-0.0.7
- old
+ new
@@ -8,19 +8,16 @@
require 'sequel'
THREADS = 1000
BUFFER_SIZE = 10
- # Forget about states after
- EXPIRY = 1000
# Update metrics every
INSERT_RATE_INTERVAL = 5
INSERT_TIMES_INTERVAL = 5
attr_reader :db, :queue
- attr_accessor :expiry
attr_accessor :insert_rate_interval
attr_accessor :insert_times_interval
def initialize(opts = {})
@db = Sequel.sqlite
@@ -31,12 +28,10 @@
@on_state_change = []
@on_state_once = []
@on_state = []
- @expiry = opts[:expiry] || EXPIRY
-
@insert_rate_interval = opts[:insert_rate_interval] || INSERT_RATE_INTERVAL
@insert_times_interval = opts[:insert_times_interval] || INSERT_TIMES_INTERVAL
setup_db
end
@@ -50,10 +45,19 @@
dt = Time.now - t0
@insert_times << dt
@insert_rate << 1
end
+ # Removes a state from the index.
+ #
+ # Right now state is anything which responds to #host and #service.
+ # I'll probably evolve the index to support arbitrary operations on all
+ # states matching a query, but haven't thought out the API.
+ def delete(state)
+ @db[:states].filter(host: state.host, service: state.service).delete
+ end
+
def thread(s)
Thread.new do
process s
@pooltex.synchronize do
@pool.delete Thread.current
@@ -144,40 +148,10 @@
ds.all.map do |row|
row_to_state row
end
end
- # Remove states older than @expiry
- def reap
- @db[:states].filter { |s|
- s.time < (Time.now - @expiry).to_i
- }.each do |row|
- on_state_change(
- row_to_state(row),
- row_to_state(
- row.merge(
- state: 'unknown',
- description: "ustate has not heard from this service since #{Time.at(row[:time])}",
- metric_f: nil,
- time: Time.now.to_i
- )
- )
- )
- @db[:states].filter(host: row[:host], state: row[:state]).delete
- end
- end
-
- # Periodically expire old states.
- def reaper
- Thread.new do
- loop do
- sleep 1
- reap
- end
- end
- end
-
# Converts a row to a State
def row_to_state(row)
State.new(row)
end
@@ -195,10 +169,9 @@
end
def start
stop!
@pool = []
- @reaper = reaper
@insert_rate = MetricThread.new(Mtrc::Rate) do |r|
self << State.new(
service: "ustate insert rate",
state: "ok",