lib/rhcf/timeseries/redis.rb in rhcf-timeseries-0.0.1 vs lib/rhcf/timeseries/redis.rb in rhcf-timeseries-0.0.2
- old
+ new
@@ -1,293 +1,227 @@
-#!/usr/local/rvm/rubies/ruby-1.9.3-p194/bin/ruby
require 'active_support/core_ext/numeric/time'
-
-#require 'observer'
-
require 'micon'
require 'date'
-class Rhcf::Timeseries::Result
- def initialize(subject, from, to, series)
- if from > to
- raise ArgumentError, "Argument 'from' can not be bigger then 'to'"
- end
- @series = series
- @subject = subject
- @from = from
- @to = to
- end
+module Rhcf
+ module Timeseries
+ class Result
+ def initialize(subject, from, to, series)
+ if from > to
+ raise ArgumentError, "Argument 'from' can not be bigger then 'to'"
+ end
+ @series = series
+ @subject = subject
+ @from = from
+ @to = to
+ end
- def total(resolution_id=nil)
- accumulator={}
- points(resolution_id || better_resolution[:id]) do |data|
- data[:values].each do |key, value|
- accumulator[key]||=0
- accumulator[key]+=value
+ def total(resolution_id=nil)
+ accumulator={}
+ points(resolution_id || better_resolution[:id]) do |data|
+ data[:values].each do |key, value|
+ accumulator[key]||=0
+ accumulator[key]+=value
+ end
+ end
+ accumulator
end
- end
- accumulator
- end
- def points(resolution_id)
- list =[]
-
- point_range(resolution_id) do |point|
- values = {}
-
- @series.events_for_subject_on(@subject, point, resolution_id).each do |event|
- value = @series.get('point', @subject, event, resolution_id, point)
- values[event] = value.to_i
- end
+ def points(resolution_id)
+ list =[]
- next if values.empty?
- data = {moment: point, values: values }
- if block_given?
- yield data
- else
- list << data
- end
- end
- list unless block_given?
- end
+ point_range(resolution_id) do |point|
+ values = {}
- def point_range(resolution_id)
-#require 'pry-debugger';binding.pry
- resolution = @series.resolution(resolution_id)
- span = resolution[:span]
- ptr = @from.dup
- while ptr < @to
- point = @series.resolution_value_at(ptr, resolution_id)
- yield point
- ptr += span.to_i
- end
- rescue FloatDomainError
- # OK
- end
+ @series.events_for_subject_on(@subject, point, resolution_id).each do |event|
+ value = @series.get('point', @subject, event, resolution_id, point)
+ values[event] = value.to_i
+ end
- def better_resolution
- span = @to - @from
- resolutions = @series.resolutions.sort_by{|h| h[:span]}.reverse
- better = resolutions.find{|r| r[:span] < span / 5} || resolutions.last
- end
-end
-
-
-class Rhcf::Timeseries::Redis
- inject :logger
- inject :redis_connection
-
- RESOLUTIONS_MAP={
- :ever => {span:Float::INFINITY, formatter: "ever"},
- :year => {span: 365.days,formatter: "%Y"},
- :week => {span: 1.week, formatter: "%Y-CW%w"},
- :month => {span: 30.days, formatter: "%Y-%m"},
- :day => {span: 1.day, formatter: "%Y-%m-%d"},
- :hour => {span: 1.hour, formatter: "%Y-%m-%dT%H"},
- :minute => {span: 1.minute, formatter: "%Y-%m-%dT%H:%M"},
- :second => {span: 1, formatter: "%Y-%m-%dT%H:%M:%S"},
- :"5seconds" => {span: 5.seconds, formatter: ->(time){ [time.strftime("%Y-%m-%dT%H:%M:") , time.to_i % 60/5, '*',5].join('') }},
- :"5minutes" => {span: 5.minutes, formatter: ->(time){ [time.strftime("%Y-%m-%dT%H:") , (time.to_i/60) % 60/5, '*',5].join('') }},
- :"15minutes" => {span: 15.minutes, formatter: ->(time){ [time.strftime("%Y-%m-%dT%H:") , (time.to_i/60) % 60/15, '*',15].join('') }}
-
- }
- DEFAULT_RESOLUTIONS = RESOLUTIONS_MAP.keys
- DEFAULT_MAX_POINTS = 1_024
- NAMESPACE_SEPARATOR = '|'
-
-
- def initialize(options = {})
- @resolution_ids = options[:resolutions] || DEFAULT_RESOLUTIONS
- @max_points = options[:max_points] || DEFAULT_MAX_POINTS
- @prefix=self.class.name
- end
-
- def store(subject, event_point_hash, moment = Time.now)
- descend(subject) do |subject_path|
- event_point_hash.each do |event, point_value|
- descend(event) do |event_path|
- resolutions_of(moment) do |resolution_name, resolution_value|
- store_point_value(subject_path, event_path, resolution_name, resolution_value, point_value)
+ next if values.empty?
+ data = {moment: point, values: values }
+ if block_given?
+ yield data
+ else
+ list << data
end
end
+ list unless block_given?
end
- end
- end
+ def point_range(resolution_id)
+ resolution = @series.resolution(resolution_id)
+ span = resolution[:span]
+ ptr = @from.dup
+ while ptr < @to
+ point = @series.resolution_value_at(ptr, resolution_id)
+ yield point
+ ptr += span.to_i
+ end
+ rescue FloatDomainError
+ # OK
+ end
- def resolutions_of(moment)
- @resolution_ids.each do |res_id|
- yield res_id, resolution_value_at(moment, res_id)
+ def better_resolution
+ span = @to - @from
+ resolutions = @series.resolutions.sort_by{|h| h[:span]}.reverse
+ better = resolutions.find{|r| r[:span] < span / 5} || resolutions.last
+ end
end
- end
- def resolution_value_at(moment, res_id)
- time_resolution_formater = RESOLUTIONS_MAP[res_id][:formatter]
- case time_resolution_formater
- when String
- moment.strftime(time_resolution_formater)
- when Proc
- time_resolution_formater.call(moment)
- else
- raise ArgumentError, "Unexpected moment formater type #{time_resolution_formater.class}"
- end
- end
+ class Redis
+ inject :logger
+ inject :redis_connection
- def descend(path, &block)
- return if path.empty? or path == "."
- block.call(path)
- descend(File.dirname(path), &block)
- end
+ RESOLUTIONS_MAP={
+ :ever => {span:Float::INFINITY, formatter: "ever", ttl: (2 * 366).days},
+ :year => {span: 365.days,formatter: "%Y", ttl: (2 * 366).days},
+ :week => {span: 1.week, formatter: "%Y-CW%w", ttl: 90.days},
+ :month => {span: 30.days, formatter: "%Y-%m", ttl: 366.days},
+ :day => {span: 1.day, formatter: "%Y-%m-%d", ttl: 30.days},
+ :hour => {span: 1.hour, formatter: "%Y-%m-%dT%H", ttl: 24.hours},
+ :minute => {span: 1.minute, formatter: "%Y-%m-%dT%H:%M", ttl: 120.minutes},
+ :second => {span: 1, formatter: "%Y-%m-%dT%H:%M:%S", ttl: 1.hour},
+ :"5seconds" => {span: 5.seconds, formatter: ->(time){ [time.strftime("%Y-%m-%dT%H:%M:") , time.to_i % 60/5, '*',5].join('') }, ttl: 1.hour},
+ :"5minutes" => {span: 5.minutes, formatter: ->(time){ [time.strftime("%Y-%m-%dT%H:") , (time.to_i/60) % 60/5, '*',5].join('') }, ttl: 3.hour},
+ :"15minutes" => {span: 15.minutes, formatter: ->(time){ [time.strftime("%Y-%m-%dT%H:") , (time.to_i/60) % 60/15, '*',15].join('') }, ttl: 24.hours}
- def store_point_event( resolution_name, resolution_value, subject_path, event_path)
- key = [@prefix, 'event_set', resolution_name, resolution_value, subject_path].join(NAMESPACE_SEPARATOR)
- logger.debug("EVENTSET SADD #{key} -> #{event_path}")
- redis_connection.sadd(key, event_path)
- end
+ }
+ DEFAULT_RESOLUTIONS = RESOLUTIONS_MAP.keys
+ DEFAULT_MAX_POINTS = 1_024
+ NAMESPACE_SEPARATOR = '|'
- def store_point_value( subject_path, event_path, resolution_name, resolution_value, point_value)
- store_point_event(resolution_name, resolution_value, subject_path, event_path)
- key = [@prefix, 'point' ,subject_path, event_path, resolution_name, resolution_value].join(NAMESPACE_SEPARATOR)
- logger.debug("SETTING KEY #{key}")
- redis_connection.incrby(key, point_value)
- end
+ def initialize(options = {})
+ @resolution_ids = options[:resolutions] || DEFAULT_RESOLUTIONS
+ @max_points = options[:max_points] || DEFAULT_MAX_POINTS
+ @prefix = options[:prefix] || self.class.name
+ @connection_to_use = nil
+ end
- def find(subject, from, to = Time.now)
- Rhcf::Timeseries::Result.new(subject, from, to, self)
- end
+ def on_connection(conn)
+ @connection_to_use = conn
+ yield self
+ @connection_to_use = nil
+ end
- def flush!
- every_key{|a_key| delete_key(a_key)}
- end
+ def redis_connection_to_use
+ @connection_to_use || redis_connection
+ end
- def every_key(pattern=nil, &block)
- pattern = [@prefix, pattern,'*'].compact.join(NAMESPACE_SEPARATOR)
- redis_connection.keys(pattern).each do |key|
- yield key
- end
- end
+ def store(subject, event_point_hash, moment = Time.now)
+ resolutions = resolutions_of(moment)
- def delete_key(a_key)
- logger.debug("DELETING KEY #{a_key}")
- redis_connection.del(a_key)
- end
+ descend(subject) do |subject_path|
+ event_point_hash.each do |event, point_value|
+ descend(event) do |event_path|
+ resolutions.each do |res|
+ resolution_name, resolution_value = *res
+ store_point_value(subject_path, event_path, resolution_name, resolution_value, point_value)
+ end
+ end
+ end
+ end
+ end
- def keys(*a_key)
- raise "GIVEUP"
- a_key = [@prefix, a_key].flatten.join(NAMESPACE_SEPARATOR)
- logger.debug("FINDING KEY #{a_key}")
- redis_connection.keys(a_key).collect{|k| k.split(NAMESPACE_SEPARATOR)[1,1000].join(NAMESPACE_SEPARATOR) }
- end
- def get(*a_key)
- a_key = [@prefix, a_key].flatten.join(NAMESPACE_SEPARATOR)
- logger.debug("GETTING KEY #{a_key}")
- redis_connection.get(a_key)
- end
+ def resolutions_of(moment)
+ @resolution_ids.collect do |res_id|
+ [res_id, resolution_value_at(moment, res_id)]
+ end
+ end
- def resolution(id)
- res = RESOLUTIONS_MAP[id]
- raise ArgumentError, "Invalid resolution name #{id} for this time series" if res.nil?
- res.merge(:id => id)
- end
- def resolutions
- @resolution_ids.collect do |id|
- resolution(id)
- end
- end
+ def resolution_value_at(moment, res_id)
+ res_config = RESOLUTIONS_MAP[res_id]
+ if res_config.nil?
+ fail "No resolution config for id: #{res_id.class}:#{res_id}"
+ end
+ time_resolution_formater = res_config[:formatter]
+ case time_resolution_formater
+ when String
+ moment.strftime(time_resolution_formater)
+ when Proc
+ time_resolution_formater.call(moment)
+ else
+ raise ArgumentError, "Unexpected moment formater type #{time_resolution_formater.class}"
+ end
+ end
- def events_for_subject_on(subject, point, resolution_id)
- key = [@prefix, 'event_set', resolution_id, point, subject].join(NAMESPACE_SEPARATOR)
- logger.debug("EVENTSET SMEMBERS #{key}")
- redis_connection.smembers(key)
- end
-end
+ def descend(path, &block)
+ return if path.empty? or path == "."
+ block.call(path)
+ descend(File.dirname(path), &block)
+ end
-#require 'json'
-#$:.unshift File.dirname(__FILE__)
-#require 'hash_storer'
+ def store_point_event( resolution_name, resolution_value, subject_path, event_path)
+ key = [@prefix, 'event_set', resolution_name, resolution_value, subject_path].join(NAMESPACE_SEPARATOR)
+ logger.debug("EVENTSET SADD #{key} -> #{event_path}")
+ redis_connection_to_use.sadd(key, event_path)
+ end
-=begin
-class Redis
-# include Observable
- attr_reader :prefix, :scales, :max_points
-
-
- def initialize(prefix, scales, max_points, store = HashStorer.new)
- @lasts = {}
- @prefix = prefix
- @scales = {}
- @store = store
- scales.each do |s|
- @scales[s] = eval(s)
- end
- @max_point = max_points
- end
+ def store_point_value( subject_path, event_path, resolution_name, resolution_value, point_value)
+ store_point_event(resolution_name, resolution_value, subject_path, event_path)
- def persist
- @store.persist if @store.respond_to?( 'persist' )
- end
+ key = [@prefix, 'point' ,subject_path, event_path, resolution_name, resolution_value].join(NAMESPACE_SEPARATOR)
+ logger.debug("SETTING KEY #{key}")
+ redis_connection_to_use.incrby(key, point_value)
+ redis_connection_to_use.expire(key, RESOLUTIONS_MAP[resolution_name][:ttl])
+ end
+ def find(subject, from, to = Time.now)
+ Rhcf::Timeseries::Result.new(subject, from, to, self)
+ end
- def add(dimension, value, time = Time.now)
- @scales.each do |name, v|
- add_to_scale(name, dimension, value, time)
- end
- end
+ def flush!
+ every_key{|a_key| delete_key(a_key)}
+ end
- def add_to_scale(scale_name, dimension, value, time = Time.now)
- scale = @scales[scale_name]
- time = time.to_i
- scale_time = time - (time % scale)
- scale_key = [@prefix, dimension, scale_name].join('-')
- last = @store.increment(scale_key, scale_time, value, time, @lasts["#{scale_name}-#{dimension}"])
- @lasts["#{scale_name}-#{dimension}"] = last['last']
- changed
- notify_observers(scale_key, last)
- end
-end
+ def every_key(pattern=nil, &block)
+ pattern = [@prefix, pattern,'*'].compact.join(NAMESPACE_SEPARATOR)
+ redis_connection_to_use.keys(pattern).each do |key|
+ yield key
+ end
+ end
+ def delete_key(a_key)
+ logger.debug("DELETING KEY #{a_key}")
+ redis_connection_to_use.del(a_key)
+ end
-class RedisTimeSeriesUpdater
- def initialize(channel_prefix, observable, redis)
- @channel_prefix = channel_prefix
- @observable = observable
- @redis = redis
- observable.add_observer(self)
- end
+ def keys(*a_key)
+ raise "GIVEUP"
+ a_key = [@prefix, a_key].flatten.join(NAMESPACE_SEPARATOR)
+ logger.debug("FINDING KEY #{a_key}")
+ redis_connection_to_use.keys(a_key).collect{|k| k.split(NAMESPACE_SEPARATOR)[1,1000].join(NAMESPACE_SEPARATOR) }
+ end
- def update(key, data)
- key += '-realtime'
-# puts ['broadcasting', data, 'on', key].join(' ')
- @redis.publish(key, data.to_json)
- end
-end
+ def get(*a_key)
+ a_key = [@prefix, a_key].flatten.join(NAMESPACE_SEPARATOR)
+ logger.debug("GETTING KEY #{a_key}")
+ redis_connection_to_use.get(a_key)
+ end
-redis = Redis.new
-ts1 = TimeSeries.new('mymachine', ['1.hour', '15.minutes', '1.minute', '1.second', '10.seconds'], 1000)
-#ts1 = TimeSeries.new('mymachine', [ '1.second'], 1000)
-pub = RedisTimeSeriesUpdater.new('realtime', ts1, redis)
+ def resolution(id)
+ res = RESOLUTIONS_MAP[id]
+ raise ArgumentError, "Invalid resolution name #{id} for this time series" if res.nil?
+ res.merge(:id => id)
+ end
+ def resolutions
+ @resolution_ids.collect do |id|
+ resolution(id)
+ end
+ end
-alive = true
-Signal.trap('INT'){
- alive = false
-}
-
-Signal.trap('USR1'){
- pub.refresh
-}
-while alive
- sleep 1
- ts1.add('cpu_load', %x{uptime | sed 's/.*: //;s/,.*//'}.strip.to_f, Time.now)
- ts1.add('rx', %x{ifconfig eth0| grep 'RX bytes' | sed 's/ (.*//;s/.*://'}.strip.to_f,Time.now)
- ts1.add('tx', %x{ifconfig eth0| grep 'TX bytes' | sed 's/.*TX.*://;s/ .*//'}.strip.to_f,Time.now)
+ def events_for_subject_on(subject, point, resolution_id)
+ key = [@prefix, 'event_set', resolution_id, point, subject].join(NAMESPACE_SEPARATOR)
+ logger.debug("EVENTSET SMEMBERS #{key}")
+ redis_connection_to_use.smembers(key)
+ end
+ end
+ end
end
-
-ts1.persist
-=end