Sha256: 21b8d23d6ecc31d78e92086ede6d5cc02fca1cd06773309f24143f6c676026e9

Contents?: true

Size: 1.9 KB

Versions: 2

Compression:

Stored size: 1.9 KB

Contents

module Gandalf
  # A magical slave
  class Worker
    include DataMapper::Resource

    property :id, Serial
    # Maximum number of jobs per interval
    property :max_jobs, Integer, :default => 30
    # Sleep length in seconds, initialized to 15
    property :interval, Integer, :default => 15
    property :max_errors, Integer, :length => 1, :default => 2

    belongs_to :scheduler

    def setup(options = {:post_class => Post})
      @queue = RedisQueue.new(:key => self.id, :redis => options[:redis]) unless @queue
      @Post = (options[:post_class])
    end

    def run
      @crawl_scheduler = Rufus::Scheduler.start_new unless @crawl_scheduler
      @crawl_scheduler.every interval do
        crawl new_jobs(max_jobs)
      end
    end

    def stop
      @crawl_scheduler.stop
    end

    def stop
      @crawl_scheduler.start
    end

    def crawl(jobs)
      urls = jobs.keys
      feeds = Feedzirra::Feed.fetch_and_parse(urls)
      jobs.each do |url, job|
        if feeds[url].is_a? Feedzirra::Parser::RSS 
          save_posts(feeds[url], job[:id])
        else
          handle_error(job)
        end
      end
    end

    def save_posts(feed, channel_id)
      posts = @Post.parse(feed)
      posts.each do |p|
        p.channel_id = channel_id
        p.clean!
        begin
          p.save
        rescue MysqlError => err
          break
        end
      end
    end

    def handle_error(job)
      if job[:errors].is_a? Fixnum
        job[:errors] += 1
      else
        job[:errors] = 1
      end

      if job[:errors] >= max_errors
        puts job
      else
        @queue.push(job)
      end
    end

    def jobs_to_do
      @queue.length
    end

    def push(jobs)
      jobs.each do |job|
        @queue.push(job)
      end
    end

    def new_jobs(count)
      jobs = @queue.pop_first(count)
      hash = {}
      jobs.each do |job|
        hash[job[:url]] = job
      end
      hash
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
gandalf-0.0.2 lib/gandalf/worker.rb
gandalf-0.0.1 lib/gandalf/worker.rb