lib/email_crawler.rb in email_crawler-0.0.5 vs lib/email_crawler.rb in email_crawler-0.0.6

- old
+ new

@@ -1,64 +1,99 @@ require "thread" require "logger" require "csv" require "set" +require "thread_safe" require_relative "email_crawler/version" require_relative "email_crawler/mechanize_helper" require_relative "email_crawler/scraper" require_relative "email_crawler/page_links" require_relative "email_crawler/email_scanner" module EmailCrawler class Runner + MAX_CONCURRENCY = 10 + + attr_writer :max_results, :max_links, :max_concurrency + def initialize(google_website) @google_website = google_website + # @logger = ::Logger.new(STDOUT).tap do |logger| log_file = File.join(ENV["HOME"], "email-crawler.log") file = File.open(log_file, File::WRONLY | File::APPEND | File::CREAT) @logger = ::Logger.new(file).tap do |logger| logger.level = ENV["DEBUG"] ? Logger::INFO : Logger::ERROR end + + yield(self) + + @logger.info "max_results: #{@max_results}" + @logger.info "max_links: #{@max_links}" + @logger.info "max_concurrency: #{@max_concurrency}" end - def run(q, max_results = Scraper::MAX_RESULTS, max_links = PageLinks::MAX_LINKS) - urls = Scraper.new(@google_website, max_results).search_result_urls_for(q) - urls.each { |url, links| @logger.info "#{url}" } + def run(q) + urls = Scraper.new(@google_website, @max_results).search_result_urls_for(q) + urls.each { |url| @logger.info "#{url}" } + queue = Queue.new + urls.each { |url| queue.push(url) } + links_by_url = ThreadSafe::Array.new - threads = (1..urls.length).map do |i| - Thread.new(i, urls[i-1]) do |i, url| - @logger.info "[Thread ##{i}] grabbing page links for '#{url}'.." - Thread.current[:url] = url - Thread.current[:links] = PageLinks.for(url, max_links) + threads = (1..[urls.length, @max_concurrency].min).map do |i| + Thread.new(i) do |i| + url = begin + queue.pop(true) + rescue ThreadError; end + + while url + @logger.info "[Thread ##{i}] grabbing page links for '#{url}'.." + links = PageLinks.for(url, @max_links) + links_by_url << [url, links] + + url = begin + queue.pop(true) + rescue ThreadError; end + end end end - threads.each(&:join) - threads.each { |thread| @logger.info "#{thread[:url]} (#{thread[:links].length} links)" } - links_by_url = Hash[threads.map { |thread| [thread[:url], thread[:links]] }] + @logger.debug "links_by_url: #{links_by_url.inspect}" - threads = (links_by_url).map.with_index do |arr, i| - Thread.new(i+1, arr.first, arr.last) do |i, url, links| - @logger.info "[Thread ##{i}] scanning for emails on page '#{url}' (#{links.length} links)" - Thread.current[:url] = url - Thread.current[:emails] = EmailScanner.new(url).scan(links) + links_by_url.each { |arr| queue.push(arr) } + emails_by_url = ThreadSafe::Hash.new + threads = (1..[links_by_url.length, MAX_CONCURRENCY].min).map do |i| + Thread.new(i) do |i| + arr = begin + queue.pop(true) + rescue ThreadError; end + + while arr + url, links = arr + @logger.info "[Thread ##{i}] scanning for emails on page '#{url}' (#{links.length} links)" + emails = EmailScanner.new(url).scan(links) + emails_by_url[url] = emails + + arr = begin + queue.pop(true) + rescue ThreadError; end + end end end - threads.each(&:join) + @logger.debug "emails_by_url: #{emails_by_url.inspect}" read_emails = Set.new CSV.generate do |csv| csv << %w(Email Domain URL) csv << [] - threads.each do |thread| - email_count = thread[:emails].inject(0) { |sum, arr| sum += arr.last.length } - @logger.info "#{thread[:url]} (#{email_count} emails)" + emails_by_url.each do |url, emails_by_link| + email_count = emails_by_link.inject(0) { |sum, arr| sum += arr.last.length } + @logger.info "#{url} (#{email_count} emails)" - url = thread[:url] - thread[:emails].each do |link, emails| + emails_by_link.each do |link, emails| emails.each do |email| csv << [email, url, link] if read_emails.add?(email) end end end