Sha256: 21b8d23d6ecc31d78e92086ede6d5cc02fca1cd06773309f24143f6c676026e9
Contents?: true
Size: 1.9 KB
Versions: 2
Compression:
Stored size: 1.9 KB
Contents
module Gandalf # A magical slave class Worker include DataMapper::Resource property :id, Serial # Maximum number of jobs per interval property :max_jobs, Integer, :default => 30 # Sleep length in seconds, initialized to 15 property :interval, Integer, :default => 15 property :max_errors, Integer, :length => 1, :default => 2 belongs_to :scheduler def setup(options = {:post_class => Post}) @queue = RedisQueue.new(:key => self.id, :redis => options[:redis]) unless @queue @Post = (options[:post_class]) end def run @crawl_scheduler = Rufus::Scheduler.start_new unless @crawl_scheduler @crawl_scheduler.every interval do crawl new_jobs(max_jobs) end end def stop @crawl_scheduler.stop end def stop @crawl_scheduler.start end def crawl(jobs) urls = jobs.keys feeds = Feedzirra::Feed.fetch_and_parse(urls) jobs.each do |url, job| if feeds[url].is_a? Feedzirra::Parser::RSS save_posts(feeds[url], job[:id]) else handle_error(job) end end end def save_posts(feed, channel_id) posts = @Post.parse(feed) posts.each do |p| p.channel_id = channel_id p.clean! begin p.save rescue MysqlError => err break end end end def handle_error(job) if job[:errors].is_a? Fixnum job[:errors] += 1 else job[:errors] = 1 end if job[:errors] >= max_errors puts job else @queue.push(job) end end def jobs_to_do @queue.length end def push(jobs) jobs.each do |job| @queue.push(job) end end def new_jobs(count) jobs = @queue.pop_first(count) hash = {} jobs.each do |job| hash[job[:url]] = job end hash end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
gandalf-0.0.2 | lib/gandalf/worker.rb |
gandalf-0.0.1 | lib/gandalf/worker.rb |