lib/keen/async/storage/redis_handler.rb in keen-0.0.53 vs lib/keen/async/storage/redis_handler.rb in keen-0.1.0
- old
+ new
@@ -4,144 +4,82 @@
require 'time'
module Keen
module Async
module Storage
- class RedisHandler
+ class RedisHandler < Keen::Async::Storage::BaseStorageHandler
# Keys
# ----
- def global_key_prefix
- "keen"
- end
-
- def active_queue_key
- "#{global_key_prefix}.active_queue_key"
- end
-
- def failed_queue_key
- "#{global_key_prefix}.failed_queue_key"
- end
-
def add_to_active_queue(value)
- @redis.lpush active_queue_key, value
+ redis.lpush active_queue_key, value
if @logging
- puts "added #{value} to active queue; length is now #{@redis.llen active_queue_key}"
+ puts "added #{value} to active queue; length is now #{redis.llen active_queue_key}"
end
end
- def record_job(job)
- add_to_active_queue JSON.generate(job)
+ def redis
+ unless @redis
+ @redis = Redis.new
+ end
+
+ @redis
end
- def handle_prior_failures
- # TODO consume the failed_queue and do something with it (loggly? retry? flat file?)
+ def count_active_queue
+ redis.llen active_queue_key
end
- def initialize(logging = false)
- @redis = Redis.new
- @logging = logging
+ def clear_active_queue
+ redis.del active_queue_key
end
-
- def redis=(connection)
- @redis = connection
- end
- def count_active_queue
- @redis.llen active_queue_key
- end
+ def get_authorized_jobs(how_many, client)
- def get_collated_jobs(how_many)
-
- # Returns a hash of the next `how_many` jobs, indexed on project_id and then collection_name.
- #
- # It looks like this:
- #
- # collated = {
- # "4f5775ad163d666a6100000e" => {
- # "clicks" => [
- # Keen::Storage::Job.new({
- # :project_id => "4f5775ad163d666a6100000e",
- # :auth_token => "a5d4eaf432914823a94ecd7e0cb547b9",
- # :collection_name => "clicks",
- # :event_body => {:user_id => "12345"},
- # }),
- # Keen::Storage::Job.new({
- # :project_id => "4f5775ad163d666a6100000e",
- # :auth_token => "a5d4eaf432914823a94ecd7e0cb547b9",
- # :collection_name => "clicks",
- # :event_body => {:user_id => "12345"},
- # }),
- # Keen::Storage::Job.new({
- # :project_id => "4f5775ad163d666a6100000e",
- # :auth_token => "a5d4eaf432914823a94ecd7e0cb547b9",
- # :collection_name => "clicks",
- # :event_body => {:user_id => "12345"},
- # }),
- # ],
- # "purchases" => [
- # Keen::Storage::Job.new({
- # :project_id => "4f5775ad163d666a6100000e",
- # :auth_token => "a5d4eaf432914823a94ecd7e0cb547b9",
- # :collection_name => "purchases",
- # :event_body => {:user_id => "12345"},
- # }),
- # Keen::Storage::Job.new({
- # :project_id => "4f5775ad163d666a6100000e",
- # :auth_token => "a5d4eaf432914823a94ecd7e0cb547b9",
- # :collection_name => "purchases",
- # :event_body => {:user_id => "12345"},
- # }),
- # Keen::Storage::Job.new({
- # :project_id => "4f5775ad163d666a6100000e",
- # :auth_token => "a5d4eaf432914823a94ecd7e0cb547b9",
- # :collection_name => "purchases",
- # :event_body => {:user_id => "12345"},
- # }),
- # ],
- # }
- # }
-
handle_prior_failures
key = active_queue_key
- jobs = []
+ job_definitions = []
+ skipped_job_definitions = []
#puts "doing the job #{how_many} times"
- how_many.times do
- this = @redis.lpop key
- if this
- jobs.push JSON.parse this
- else
- #puts "couldn't process value #{this}"
+ while true do
+ this = redis.lpop key
+
+ # If we're out of jobs, end the loop:
+ if not this
+ break
end
- end
+
+ # Parse the JSON into a job definition
+ job_definition = JSON.parse this
+ job_definition = Keen::Utils.symbolize_keys(job_definition)
- collate_jobs(jobs)
- end
+ # Make sure this client is authorized to process this job:
+ unless job_definition[:project_id] == client.project_id
+ unless job_definition[:auth_token] == client.auth_token
+ # We're not authorized, so skip this job.
+ skipped_job_definitions.push job_definition
+ next
+ end
+ end
- def collate_jobs(queue)
- collated = {}
+ job_definitions.push job_definition
- queue.each do |job_hash|
-
- job = Keen::Async::Job.new(self, job_hash)
-
- if not collated.has_key? job.project_id
- collated[job.project_id] = {}
+ if job_definitions.length == how_many
+ break
end
- if not collated[job.project_id].has_key? job.collection_name
- collated[job.project_id][job.collection_name] = []
- end
+ end
- collated[job.project_id][job.collection_name].push(job)
+ # Put the skipped jobs back on the queue.
+ skipped_job_definitions.each do |job_definition|
+ redis.lpush key, job_definition
end
- collated
end
end
end
end