module Gandalf # A magical slave class Worker include DataMapper::Resource property :id, Serial # Maximum number of jobs per interval property :max_jobs, Integer, :default => 30 # Sleep length in seconds, initialized to 15 property :interval, Integer, :default => 15 property :max_errors, Integer, :length => 1, :default => 2 belongs_to :scheduler def setup(options = {}) @queue = RedisQueue.new(:key => self.id, :redis => options[:redis], :host => self.scheduler.redis_host) if options[:post_class] @Post = options[:post_class] else @Post = Post end end def run @crawl_scheduler = Rufus::Scheduler.start_new unless @crawl_scheduler @crawl_scheduler.every interval do crawl new_jobs(max_jobs) if jobs_to_do > 0 end end def @crawl_scheduler.handle_exception(job, exception) puts exception raise exception end def stop @crawl_scheduler.stop end def stop @crawl_scheduler.start end def crawl(jobs) urls = jobs.keys feeds = Feedzirra::Feed.fetch_and_parse(urls) jobs.each do |url, job| if feeds[url].is_a? Feedzirra::Parser::RSS save_posts(feeds[url], job[:id]) else handle_error(job) end end end def save_posts(feed, channel_id) posts = @Post.parse(feed) posts.each do |p| p.channel_id = channel_id p.clean! begin p.save rescue MysqlError => err break end end end def handle_error(job) if job[:errors].is_a? Fixnum job[:errors] += 1 else job[:errors] = 1 end if job[:errors] >= max_errors puts job else @queue.push(job) end end def jobs_to_do @queue.length end def push(jobs) jobs.each do |job| @queue.push(job) end end def new_jobs(count) jobs = @queue.pop_first(count) hash = {} jobs.each do |job| hash[job[:url]] = job end hash end end end