lib/rhcf/timeseries/redis_strategies.rb in rhcf-timeseries-1.0.2 vs lib/rhcf/timeseries/redis_strategies.rb in rhcf-timeseries-1.0.3

- old
+ new

@@ -3,11 +3,11 @@ class RedisHgetallStrategy def id 'H' end - def crunch_values(manager, subject, resolution_id, point, filter) + def crunch_values(manager, subject, resolution_id, point, filter, limit = 100) values = hgetall(manager, EVENT_POINT_TOKEN, subject, resolution_id, point) values.reject!{|event, value| !filter.match?(event) } if filter values end @@ -54,11 +54,11 @@ class RedisMgetStrategy < RedisStringBasedStrategy def id 'M' end - def crunch_values(manager, subject, resolution_id, point, filter) + def crunch_values(manager, subject, resolution_id, point, filter, limit = 100) events = events_for_subject_on(manager, subject, point, resolution_id, filter) mget(manager, EVENT_POINT_TOKEN, subject, resolution_id, point, events) end def mget(manager, k, s, r, p, es) @@ -85,33 +85,39 @@ manager.connection_to_use.smembers(key) end events end - def crunch_values(manager, subject, resolution_id, point, filter) + def crunch_values(manager, subject, resolution_id, point, filter, limit = 1000) register_lua_scripts!(manager.connection_to_use) point_prefix = [manager.prefix, EVENT_POINT_TOKEN, subject, resolution_id, point].join(NAMESPACE_SEPARATOR) set_key = [manager.prefix, EVENT_SET_TOKEN, resolution_id, point, subject].join(NAMESPACE_SEPARATOR) data = manager.connection_to_use.evalsha(evalsha_for(:mget_matching_smembers), - keys: [set_key], argv: [point_prefix, filter && filter.to_lua_pattern]) + keys: [set_key], argv: [point_prefix, filter && filter.to_lua_pattern, limit]) return {} if data.nil? result = {} + begin data.first.each_with_index do |evt, idx| value = data.last[idx].to_i result[evt] = value end + rescue + p $!, $!.message + raise + end result end def evalsha_for(sym_os_lua_script) @lua_script_register[sym_os_lua_script] || fail("Script for '#{sym_os_lua_script}' not registered") end def register_lua_scripts!(connection) + @lua_script_register ||= begin smembers_matching = <<-EOF local matches = {} for _, val in ipairs(redis.call('smembers', KEYS[1])) do @@ -121,28 +127,92 @@ end return matches EOF mget_matching_smembers = <<-EOF - local matches = {} local set_key = KEYS[1] local key_prefix = ARGV[1] local filter_pattern = ARGV[2] + local limit = tonumber(ARGV[3]) local keys = {} local keys_to_mget = {} + local function log(msg) + -- redis.call('publish', 'log', msg) + end + + local function mget_in_batches(keys_to_mget) + local step = 1024 + local results = {} + local last_end = 0 + local partial = {} + + local function mget_batch(ini , fin) + log("Getting from " .. ini .. ' to ' .. fin .. ' on a total of ' .. #keys_to_mget) + partial = redis.call('MGET', unpack(keys_to_mget, ini, fin)) + for _, value in pairs(partial) do table.insert(results, value) end + end + + for ending = step, #keys_to_mget, step do + mget_batch(last_end + 1, ending) + last_end = ending + end + + if last_end < #keys_to_mget then + mget_batch(last_end + 1, #keys_to_mget) + end + + return results; + end + + local function sort_and_limit_tuples(subjects, values) + local dictionary = {} + for i, subject in pairs(subjects) do + local value = values[i] or 0 + -- redis.call('publish', 'log', subject .. ' += ' .. value) + dictionary[subject] = (dictionary[subject] or 0) + value + end + + local tuples = {} + for subject, value in pairs(dictionary) do + -- redis.call('publish', 'log', subject .. ' = ' .. value) + table.insert(tuples, { subject, value } ) + end + + table.sort(tuples, function(a, b) return b[2] < a[2] end ) + + local new_subjects = {} + local new_counts = {} + + for i, tuple in pairs(tuples) do + if #new_subjects >= limit then break end + + local subject = tuple[1] + local value = tuple[2] + + table.insert(new_subjects, subject) + table.insert(new_counts, value) + end + + return {new_subjects, new_counts} + end + for _, val in ipairs(redis.call('smembers', set_key)) do if (filter_pattern and string.match(val, filter_pattern)) or not filter_pattern then table.insert(keys, val) table.insert(keys_to_mget, key_prefix .. '#{NAMESPACE_SEPARATOR}' .. val) end end if table.getn(keys) > 0 then - return {keys, redis.call('MGET', unpack(keys_to_mget)) } + local values = mget_in_batches(keys_to_mget) + local sorted = sort_and_limit_tuples(keys, values) + log ("Values card " .. #values .. " | keys card: " .. #keys) + return sorted + else + return {{},{}} end - return {{},{}} EOF { mget_matching_smembers: connection.script(:load, mget_matching_smembers), smembers_matching: connection.script(:load, smembers_matching) @@ -154,10 +224,10 @@ class RedisGetStrategy < RedisStringBasedStrategy def id 'G' end - def crunch_values(manager, subject, resolution_id, point, filter) + def crunch_values(manager, subject, resolution_id, point, filter, limit = 100) events = events_for_subject_on(manager, subject, point, resolution_id, filter) values = {} events.each do |event| value = get(manager, EVENT_POINT_TOKEN, subject, resolution_id, point, event) values[event] = value.to_i