lib/rhcf/timeseries/redis_strategies.rb in rhcf-timeseries-1.0.3 vs lib/rhcf/timeseries/redis_strategies.rb in rhcf-timeseries-2.0.0pre
- old
+ new
@@ -3,64 +3,83 @@
class RedisHgetallStrategy
def id
'H'
end
- def crunch_values(manager, subject, resolution_id, point, filter, limit = 100)
- values = hgetall(manager, EVENT_POINT_TOKEN, subject, resolution_id, point)
- values.reject!{|event, value| !filter.match?(event) } if filter
+ def crunch_values(manager, evt_filter, resolution_id, time_point, subj_filter)
+ values = hgetall(manager, EVENT_POINT_TOKEN, evt_filter, resolution_id, time_point)
+ values.reject!{|event, value| !subj_filter.match?(event) } if subj_filter
values
end
- def store_point_value(manager, subject_path, resolution_name, resolution_value, point_value, event_path)
- key = [manager.prefix, EVENT_POINT_TOKEN ,subject_path, resolution_name, resolution_value].join(NAMESPACE_SEPARATOR)
- manager.connection_to_use.hincrby(key, event_path, point_value)
- manager.connection_to_use.expire(key, DEFAULT_RESOLUTIONS_MAP[resolution_name][:ttl])
+ def store_point_value(manager, event_path, resolution_id, resolution_val, subject_path, increment)
+ key = point_prefix(manager, event_path, resolution_id, resolution_val)
+ manager.connection_to_use.hincrby(key, subject_path, increment)
+ manager.connection_to_use.expire(key, DEFAULT_RESOLUTIONS_MAP[resolution_id][:ttl])
end
def hgetall(manager, k,s,r,p)
key = [ manager.prefix, k,s,r,p].join(NAMESPACE_SEPARATOR)
manager.connection_to_use.hgetall(key).each_with_object({}) do |(_k, value), hash|
hash[_k] = value.to_i
end
end
+
+ def point_prefix(manager, evt_filter, resolution_id, time_point = nil, subj_path = nil)
+ [manager.prefix, EVENT_POINT_TOKEN, evt_filter, resolution_id, time_point, subj_path].compact.join(NAMESPACE_SEPARATOR)
+ end
+
+ def set_prefix(manager, evt_filter, resolution_id, time_point = nil)
+ [manager.prefix, EVENT_SET_TOKEN, evt_filter, resolution_id, time_point].compact.join(NAMESPACE_SEPARATOR)
+ end
+
+
+
end
class RedisStringBasedStrategy
+
+ def point_prefix(manager, evt_filter, resolution_id, time_point = nil, subj_path = nil)
+ [manager.prefix, EVENT_POINT_TOKEN, evt_filter, resolution_id, time_point, subj_path].compact.join(NAMESPACE_SEPARATOR)
+ end
+
+ def set_prefix(manager, evt_filter, resolution_id, time_point = nil)
+ [manager.prefix, EVENT_SET_TOKEN, evt_filter, resolution_id, time_point].compact.join(NAMESPACE_SEPARATOR)
+ end
+
+
def id
fail 'AbstractStrategy'
end
- def store_point_value(manager, subject_path, resolution_name, resolution_value, point_value, event_path)
- store_point_event(manager, resolution_name, resolution_value, subject_path, event_path)
- key = [manager.prefix, EVENT_POINT_TOKEN ,subject_path, resolution_name, resolution_value, event_path].join(NAMESPACE_SEPARATOR)
- manager.connection_to_use.incrby(key, point_value)
- manager.connection_to_use.expire(key, DEFAULT_RESOLUTIONS_MAP[resolution_name][:ttl])
- end
+ def store_point_value(manager, event_path, resolution_name, resolution_val, subj_path, increment)
+ set_key = set_prefix(manager, event_path, resolution_name, resolution_val)
+ counter_key = point_prefix(manager, event_path, resolution_name, resolution_val, subj_path)
- def store_point_event(manager, resolution_name, resolution_value, subject_path, event_path)
- key = [manager.prefix, EVENT_SET_TOKEN, resolution_name, resolution_value, subject_path].join(NAMESPACE_SEPARATOR)
- manager.connection_to_use.sadd(key, event_path)
- manager.connection_to_use.expire(key, DEFAULT_RESOLUTIONS_MAP[resolution_name][:ttl])
+ manager.connection_to_use.sadd(set_key, subj_path)
+ manager.connection_to_use.incrby(counter_key, increment)
+
+ manager.connection_to_use.expire(counter_key, DEFAULT_RESOLUTIONS_MAP[resolution_name][:ttl])
+ manager.connection_to_use.expire(set_key, DEFAULT_RESOLUTIONS_MAP[resolution_name][:ttl])
end
- def events_for_subject_on(manager, subject, point, resolution_id, filter)
- key = [manager.prefix, EVENT_SET_TOKEN, resolution_id, point, subject].join(NAMESPACE_SEPARATOR)
+ def events_for_subject_on(manager, evt_filter, res_point, resolution_id, subj_filter)
+ key = set_prefix(manager, evt_filter, resolution_id, res_point)
events = manager.connection_to_use.smembers(key)
- events = events.select{|event| filter.match?(event) } if filter
+ events = events.select{|event| subj_filter.match?(event) } if subj_filter
events
end
end
class RedisMgetStrategy < RedisStringBasedStrategy
def id
'M'
end
- def crunch_values(manager, subject, resolution_id, point, filter, limit = 100)
- events = events_for_subject_on(manager, subject, point, resolution_id, filter)
- mget(manager, EVENT_POINT_TOKEN, subject, resolution_id, point, events)
+ def crunch_values(manager, evt_filter, resolution_id, time_point, subj_filter)
+ events = events_for_subject_on(manager, evt_filter, time_point, resolution_id, subj_filter)
+ mget(manager, EVENT_POINT_TOKEN, evt_filter, resolution_id, time_point, events)
end
def mget(manager, k, s, r, p, es)
return {} if es.empty?
keys = es.map{|e| [manager.prefix, k, s, r, p, e].flatten.join(NAMESPACE_SEPARATOR)}
@@ -74,28 +93,43 @@
end
class RedisMgetLuaStrategy < RedisMgetStrategy
def id; 'ME'; end
- def events_for_subject_on(manager, subject, point, resolution_id, filter)
- key = [manager.prefix, EVENT_SET_TOKEN, resolution_id, point, subject].join(NAMESPACE_SEPARATOR)
- events = if filter
- manager.connection_to_use.evalsha(evalsha_for(:smembers_matching),
- keys: [key], argv: [filter.to_lua_pattern])
+ def ranking(manager, evt_filter, resolution_id, points_on_range, subj_filter, limit)
+ point_prefix = point_prefix(manager, evt_filter, resolution_id)
+ set_prefix = set_prefix(manager, evt_filter, resolution_id)
+
+ manager.connection_to_use.evalsha(evalsha_for(manager, :ranking),
+ keys: points_on_range,
+ argv: [
+ evt_filter,
+ subj_filter && subj_filter.to_lua_pattern,
+ set_prefix,
+ point_prefix,
+ limit
+ ])
+ end
+
+ def events_for_subject_on(manager, evt_filter, time_point, resolution_id, subj_filter)
+ key = set_prefix(manager, resolution_id, evt_filter, time_point)
+ events = if subj_filter
+ manager.connection_to_use.evalsha(evalsha_for(manager, :smembers_matching),
+ keys: [key], argv: [subj_filter.to_lua_pattern])
else
manager.connection_to_use.smembers(key)
end
events
end
- def crunch_values(manager, subject, resolution_id, point, filter, limit = 1000)
- register_lua_scripts!(manager.connection_to_use)
- point_prefix = [manager.prefix, EVENT_POINT_TOKEN, subject, resolution_id, point].join(NAMESPACE_SEPARATOR)
- set_key = [manager.prefix, EVENT_SET_TOKEN, resolution_id, point, subject].join(NAMESPACE_SEPARATOR)
+ def crunch_values(manager, evt_filter, resolution_id, time_point, subj_filter)
+ point_prefix = point_prefix(manager, resolution_id, evt_filter, time_point)
+ set_key = point_prefix(manager, resolution_id, evt_filter, time_point)
- data = manager.connection_to_use.evalsha(evalsha_for(:mget_matching_smembers),
- keys: [set_key], argv: [point_prefix, filter && filter.to_lua_pattern, limit])
+ data = manager.connection_to_use.evalsha(evalsha_for(manager, :mget_matching_smembers),
+ keys: [set_key],
+ argv: [point_prefix, subj_filter && subj_filter.to_lua_pattern])
return {} if data.nil?
result = {}
begin
data.first.each_with_index do |evt, idx|
@@ -108,21 +142,67 @@
end
result
end
- def evalsha_for(sym_os_lua_script)
- @lua_script_register[sym_os_lua_script] || fail("Script for '#{sym_os_lua_script}' not registered")
+ def evalsha_for(manager, sym_os_lua_script)
+ register_lua_scripts(manager.connection_to_use).fetch(sym_os_lua_script)
end
- def register_lua_scripts!(connection)
+ def register_lua_scripts(connection)
@lua_script_register ||=
begin
+ ranking_script = <<-EOF
+ local evt_filter = ARGV[1]
+ local subj_filter = ARGV[2]
+ local set_prefix = ARGV[3]
+ local point_prefix = ARGV[4]
+ local limit = tonumber(ARGV[5])
+
+ local set_keys = {}
+ for _, time in pairs(KEYS) do
+ table.insert(set_keys, set_prefix .. '|' .. time)
+ end
+
+ local all_subjects = redis.call("SUNION", unpack(set_keys))
+ local filtered_subjects = {}
+
+ if subj_filter then
+ for _, val in pairs(all_subjects) do
+ if string.match(val, subj_filter) then
+ table.insert(filtered_subjects, val)
+ end
+ end
+ else
+ filtered_subjects = all_subjects
+ end
+
+ local counter_tuples = {}
+
+ for _, subject in pairs(filtered_subjects) do
+ local my_counter_keys = {}
+
+ for _, time in pairs(KEYS) do
+ table.insert(my_counter_keys, point_prefix .. "|" .. time .. "|" .. subject)
+ end
+
+ local counter_total = 0
+ for _, val in pairs(redis.call("MGET", unpack(my_counter_keys))) do
+ counter_total = counter_total + ( tonumber(val) or 0 )
+ end
+
+ table.insert(counter_tuples, {subject, counter_total} )
+ end
+
+ table.sort(counter_tuples, function(a, b) return b[2] < a[2] end )
+ return counter_tuples
+ EOF
+
smembers_matching = <<-EOF
local matches = {}
- for _, val in ipairs(redis.call('smembers', KEYS[1])) do
+ for _, val in pairs(redis.call('smembers', KEYS[1])) do
if string.match(val, ARGV[1]) then
table.insert(matches, val)
end
end
return matches
@@ -134,27 +214,30 @@
local filter_pattern = ARGV[2]
local limit = tonumber(ARGV[3])
local keys = {}
local keys_to_mget = {}
+
local function log(msg)
- -- redis.call('publish', 'log', msg)
+ redis.call('publish', 'log', "XDEBUG: " .. msg)
end
local function mget_in_batches(keys_to_mget)
- local step = 1024
- local results = {}
+ local step = 1024
+ local results = {}
local last_end = 0
- local partial = {}
+ local partial = {}
+
local function mget_batch(ini , fin)
log("Getting from " .. ini .. ' to ' .. fin .. ' on a total of ' .. #keys_to_mget)
+
partial = redis.call('MGET', unpack(keys_to_mget, ini, fin))
for _, value in pairs(partial) do table.insert(results, value) end
end
- for ending = step, #keys_to_mget, step do
+ for ending = step, #keys_to_mget, step do
mget_batch(last_end + 1, ending)
last_end = ending
end
if last_end < #keys_to_mget then
@@ -164,81 +247,83 @@
return results;
end
local function sort_and_limit_tuples(subjects, values)
local dictionary = {}
- for i, subject in pairs(subjects) do
+ for i, evt_filter in pairs(subjects) do
local value = values[i] or 0
- -- redis.call('publish', 'log', subject .. ' += ' .. value)
- dictionary[subject] = (dictionary[subject] or 0) + value
+ dictionary[evt_filter] = (dictionary[evt_filter] or 0) + value
end
local tuples = {}
- for subject, value in pairs(dictionary) do
- -- redis.call('publish', 'log', subject .. ' = ' .. value)
- table.insert(tuples, { subject, value } )
+ for evt_filter, value in pairs(dictionary) do
+ table.insert(tuples, { evt_filter, value } )
end
table.sort(tuples, function(a, b) return b[2] < a[2] end )
local new_subjects = {}
local new_counts = {}
for i, tuple in pairs(tuples) do
if #new_subjects >= limit then break end
- local subject = tuple[1]
+ local evt_filter = tuple[1]
local value = tuple[2]
- table.insert(new_subjects, subject)
+ table.insert(new_subjects, evt_filter)
table.insert(new_counts, value)
end
return {new_subjects, new_counts}
end
+ log("SETKEY " .. set_key ) -- #.. " | KEY prefix: " .. key_prefix .. " FILTER PATTERN: " .. filter_pattern)
+-- log("SETKEY " .. set_key .. " | KEY prefix: " .. key_prefix .. " FILTER PATTERN: " .. filter_pattern)
+
for _, val in ipairs(redis.call('smembers', set_key)) do
if (filter_pattern and string.match(val, filter_pattern)) or not filter_pattern then
table.insert(keys, val)
table.insert(keys_to_mget, key_prefix .. '#{NAMESPACE_SEPARATOR}' .. val)
end
end
if table.getn(keys) > 0 then
local values = mget_in_batches(keys_to_mget)
- local sorted = sort_and_limit_tuples(keys, values)
- log ("Values card " .. #values .. " | keys card: " .. #keys)
- return sorted
+ -- local sorted = sort_and_limit_tuples(keys, values)
+ -- log ("Values card " .. #values .. " | keys card: " .. #keys)
+ -- return sorted
+ return {keys, values}
else
return {{},{}}
end
EOF
{
mget_matching_smembers: connection.script(:load, mget_matching_smembers),
- smembers_matching: connection.script(:load, smembers_matching)
+ smembers_matching: connection.script(:load, smembers_matching),
+ ranking: connection.script(:load, ranking_script)
}
end
end
end
class RedisGetStrategy < RedisStringBasedStrategy
def id
'G'
end
- def crunch_values(manager, subject, resolution_id, point, filter, limit = 100)
- events = events_for_subject_on(manager, subject, point, resolution_id, filter)
+ def crunch_values(manager, evt_filter, resolution_id, time_point, subj_filter, limit = 100)
+ events = events_for_subject_on(manager, evt_filter, time_point, resolution_id, subj_filter)
values = {}
events.each do |event|
- value = get(manager, EVENT_POINT_TOKEN, subject, resolution_id, point, event)
+ value = get(manager, point_prefix(manager, evt_filter, resolution_id, time_point))
values[event] = value.to_i
end
values
end
- def get(manager, *a_key)
- a_key = [manager.prefix, a_key].flatten.join(NAMESPACE_SEPARATOR)
+ def get(manager, a_key)
manager.connection_to_use.get(a_key)
end
end
end
end