module Gandalf # A magical scheduler class Scheduler include DataMapper::Resource property :id, Serial property :redis_host, String, :default => nil property :redis_db_id, Integer, :default => 0 property :seed_table, String property :seed_count, Integer, :default => 0 property :last_job_id, Integer, :default => 0 # Sleep length in seconds, initialized to 60 property :interval, Integer, :length => 5, :default => 60 has n, :workers attr_accessor :redis # Sets workers' queues with Redis connection object. def setup(options = {:seed_class => Seed}) @redis = Redis.new(:host => self.redis_host, :db => self.redis_db_id) @Seed = options[:seed_class] workers.each { |worker| worker.setup(:redis => @redis) } end # Executes jobs using a scheduler def run scheduler = Rufus::Scheduler.start_new scheduler.every interval do execute end scheduler.every 10*interval do # TODO Use dm-aggregates when the bug gets fixed. self.seed_count = repository.adapter.query("SELECT COUNT(*) FROM #{seed_table} WHERE include_update = 1") save end end def execute jobs = new_jobs self.last_job_id = jobs.last.id save new_loads = job_distribution(current_workload, jobs.count) push_jobs(jobs, new_loads) end def jobs_per_interval seed_count / (1440 * 60 / interval) end def new_jobs jobs = @Seed.all(:id.gte => last_job_id, :include_update => true, :limit => jobs_per_interval) if jobs.length < jobs_per_interval jobs += @Seed.all(:limit => jobs_per_interval - jobs.length) end jobs end def current_workload workload = {} workers.each { |worker| workload[worker.id] = worker.jobs_to_do } workload end def push_jobs(jobs, workload) workload.each do |worker_id, wload| worker = workers.get(worker_id) worker.push(jobs.slice!(0,wload)) end end # Recursively calculates next job distribution # TODO Find a formula def job_distribution(workload, jobs) workload = workload.clone distribution = Hash.new(0) jobs.times do min_index = workload.min_by{|k, v| v}.first workload[min_index] += 1 distribution[min_index] += 1 end distribution end end end