lib/outlander/crawler.rb in outlander-0.1.0 vs lib/outlander/crawler.rb in outlander-0.2.0

- old
+ new

@@ -1,16 +1,17 @@ require 'nokogiri' -require 'logger' +require 'thread' require 'outlander/agent' require 'outlander/threads_pool' module Outlander module Crawler DEFAULT_OPTIONS = { - num_threads: 3 + num_threads: 3, + pause: 1 } class << self def included(base) base.extend ClassMethods @@ -22,13 +23,13 @@ end end module ClassMethods - attr_reader :roots, :setup, :handlers + attr_reader :roots, :handlers, :setup - def entrypoint(url, handler = :process_root) + def entry_point(url, handler = :process_root) @roots[url] = handler end def before_start(&block) @setup = block @@ -42,17 +43,17 @@ end end end def initialize(options = {}) - agent.cache_storage = options.delete(:cache_storage) - @logger = Logger.new(options.fetch(:log_to, STDOUT)) - @options = options.merge DEFAULT_OPTIONS + @cache = options.delete(:cache) + @options = DEFAULT_OPTIONS.merge options @history = {} + @mutex = Mutex.new @pool = ThreadsPool.new @options[:num_threads] self.class.roots.each do |url, handler| - enqueue url, handler + discover url, handler end end def run!(&block) @result_handler = block @@ -60,29 +61,48 @@ @pool.start end private - def record(data) - @result_handler.call data + def record(*args) + @result_handler.call *args end - def enqueue(url, handler, *args) - return if @history[url] == handler + def discover(url, handler, *args) + @mutex.synchronize { + return if @history[url] == handler + @history[url] = handler + } @pool.enqueue do begin - body = agent.get_with_cache(url) + body = fetch url do + sleep rand(@options[:pause]) + @agent.get(url).body + end instance_exec Nokogiri::HTML(body), *args, &self.class.handlers[handler.to_sym] rescue => e - @logger.error "Failed to process #{url} with ##{handler} #{e.inspect}" - else - @logger.info "Processed #{url} with ##{handler}" + puts "[ERROR] Failed to process #{url} with ##{handler} #{e.inspect}" end end - @logger.info "Enqueued #{url} for ##{handler}" end def agent - @agent ||= Agent.dup + @agent ||= Class.new(Agent) + end + + def fetch(url, &block) + if cache_enabled? + return @cache.get url if @cache.exists url + + value = block.call + @cache.set url, value + value + else + block.call + end + end + + def cache_enabled? + !!@cache end end end