lib/keen/async/storage/redis_handler.rb in keen-0.0.53 vs lib/keen/async/storage/redis_handler.rb in keen-0.1.0

- old
+ new

@@ -4,144 +4,82 @@ require 'time' module Keen module Async module Storage - class RedisHandler + class RedisHandler < Keen::Async::Storage::BaseStorageHandler # Keys # ---- - def global_key_prefix - "keen" - end - - def active_queue_key - "#{global_key_prefix}.active_queue_key" - end - - def failed_queue_key - "#{global_key_prefix}.failed_queue_key" - end - def add_to_active_queue(value) - @redis.lpush active_queue_key, value + redis.lpush active_queue_key, value if @logging - puts "added #{value} to active queue; length is now #{@redis.llen active_queue_key}" + puts "added #{value} to active queue; length is now #{redis.llen active_queue_key}" end end - def record_job(job) - add_to_active_queue JSON.generate(job) + def redis + unless @redis + @redis = Redis.new + end + + @redis end - def handle_prior_failures - # TODO consume the failed_queue and do something with it (loggly? retry? flat file?) + def count_active_queue + redis.llen active_queue_key end - def initialize(logging = false) - @redis = Redis.new - @logging = logging + def clear_active_queue + redis.del active_queue_key end - - def redis=(connection) - @redis = connection - end - def count_active_queue - @redis.llen active_queue_key - end + def get_authorized_jobs(how_many, client) - def get_collated_jobs(how_many) - - # Returns a hash of the next `how_many` jobs, indexed on project_id and then collection_name. - # - # It looks like this: - # - # collated = { - # "4f5775ad163d666a6100000e" => { - # "clicks" => [ - # Keen::Storage::Job.new({ - # :project_id => "4f5775ad163d666a6100000e", - # :auth_token => "a5d4eaf432914823a94ecd7e0cb547b9", - # :collection_name => "clicks", - # :event_body => {:user_id => "12345"}, - # }), - # Keen::Storage::Job.new({ - # :project_id => "4f5775ad163d666a6100000e", - # :auth_token => "a5d4eaf432914823a94ecd7e0cb547b9", - # :collection_name => "clicks", - # :event_body => {:user_id => "12345"}, - # }), - # Keen::Storage::Job.new({ - # :project_id => "4f5775ad163d666a6100000e", - # :auth_token => "a5d4eaf432914823a94ecd7e0cb547b9", - # :collection_name => "clicks", - # :event_body => {:user_id => "12345"}, - # }), - # ], - # "purchases" => [ - # Keen::Storage::Job.new({ - # :project_id => "4f5775ad163d666a6100000e", - # :auth_token => "a5d4eaf432914823a94ecd7e0cb547b9", - # :collection_name => "purchases", - # :event_body => {:user_id => "12345"}, - # }), - # Keen::Storage::Job.new({ - # :project_id => "4f5775ad163d666a6100000e", - # :auth_token => "a5d4eaf432914823a94ecd7e0cb547b9", - # :collection_name => "purchases", - # :event_body => {:user_id => "12345"}, - # }), - # Keen::Storage::Job.new({ - # :project_id => "4f5775ad163d666a6100000e", - # :auth_token => "a5d4eaf432914823a94ecd7e0cb547b9", - # :collection_name => "purchases", - # :event_body => {:user_id => "12345"}, - # }), - # ], - # } - # } - handle_prior_failures key = active_queue_key - jobs = [] + job_definitions = [] + skipped_job_definitions = [] #puts "doing the job #{how_many} times" - how_many.times do - this = @redis.lpop key - if this - jobs.push JSON.parse this - else - #puts "couldn't process value #{this}" + while true do + this = redis.lpop key + + # If we're out of jobs, end the loop: + if not this + break end - end + + # Parse the JSON into a job definition + job_definition = JSON.parse this + job_definition = Keen::Utils.symbolize_keys(job_definition) - collate_jobs(jobs) - end + # Make sure this client is authorized to process this job: + unless job_definition[:project_id] == client.project_id + unless job_definition[:auth_token] == client.auth_token + # We're not authorized, so skip this job. + skipped_job_definitions.push job_definition + next + end + end - def collate_jobs(queue) - collated = {} + job_definitions.push job_definition - queue.each do |job_hash| - - job = Keen::Async::Job.new(self, job_hash) - - if not collated.has_key? job.project_id - collated[job.project_id] = {} + if job_definitions.length == how_many + break end - if not collated[job.project_id].has_key? job.collection_name - collated[job.project_id][job.collection_name] = [] - end + end - collated[job.project_id][job.collection_name].push(job) + # Put the skipped jobs back on the queue. + skipped_job_definitions.each do |job_definition| + redis.lpush key, job_definition end - collated end end end end