lib/pupa/processor.rb in pupa-0.0.7 vs lib/pupa/processor.rb in pupa-0.0.8

- old
+ new

@@ -4,38 +4,45 @@ require 'pupa/processor/client' require 'pupa/processor/dependency_graph' require 'pupa/processor/helper' require 'pupa/processor/persistence' +require 'pupa/processor/document_store' require 'pupa/processor/yielder' +require 'pupa/processor/document_store/file_store' +require 'pupa/processor/document_store/redis_store' + module Pupa # An abstract processor class from which specific processors inherit. class Processor extend Forwardable include Helper class_attribute :tasks self.tasks = [] - attr_reader :report, :client, :options + attr_reader :report, :store, :client, :options def_delegators :@logger, :debug, :info, :warn, :error, :fatal - # @param [String] output_dir the directory in which to dump JSON documents - # @param [String] cache_dir the directory in which to cache HTTP responses + # @param [String] output_dir the directory or Redis address + # (e.g. `redis://localhost:6379`) in which to dump JSON documents + # @param [String] cache_dir the directory or Memcached address + # (e.g. `memcached://localhost:11211`) in which to cache HTTP responses # @param [Integer] expires_in the cache's expiration time in seconds + # @param [Boolean] validate whether to validate JSON documents # @param [String] level the log level # @param [String,IO] logdev the log device # @param [Hash] options criteria for selecting the methods to run - def initialize(output_dir, cache_dir: nil, expires_in: 86400, level: 'INFO', logdev: STDOUT, options: {}) - @output_dir = output_dir - @options = options - @level = level - @logger = Logger.new('pupa', level: level, logdev: logdev) - @client = Client.new(cache_dir: cache_dir, expires_in: expires_in, level: level) - @report = {} + def initialize(output_dir, cache_dir: nil, expires_in: 86400, validate: true, level: 'INFO', logdev: STDOUT, options: {}) + @store = DocumentStore.new(output_dir) + @client = Client.new(cache_dir: cache_dir, expires_in: expires_in, level: level) + @logger = Logger.new('pupa', level: level, logdev: logdev) + @validate = validate + @options = options + @report = {} end # Retrieves and parses a document with a GET request. # # @param [String] url a URL to an HTML document @@ -211,37 +218,35 @@ # # @param [Object] object an scraped object # @raises [Pupa::Errors::DuplicateObjectIdError] def dump_scraped_object(object) type = object.class.to_s.demodulize.underscore - basename = "#{type}_#{object._id.gsub(File::SEPARATOR, '_')}.json" - path = File.join(@output_dir, basename) + name = "#{type}_#{object._id.gsub(File::SEPARATOR, '_')}.json" - if File.exist?(path) + if @store.exist?(name) raise Errors::DuplicateObjectIdError, "duplicate object ID: #{object._id} (was the same objected yielded twice?)" end - info {"save #{type} #{object.to_s} as #{basename}"} + info {"save #{type} #{object.to_s} as #{name}"} - File.open(path, 'w') do |f| - f.write(JSON.dump(object.to_h(include_foreign_objects: true))) - end + @store.write(name, object.to_h(include_foreign_objects: true)) - begin - object.validate! - rescue JSON::Schema::ValidationError => e - warn {e.message} + if @validate + begin + object.validate! + rescue JSON::Schema::ValidationError => e + warn {e.message} + end end end # Loads scraped objects from disk. # # @return [Hash] a hash of scraped objects keyed by ID def load_scraped_objects {}.tap do |objects| - Dir[File.join(@output_dir, '*.json')].each do |path| - data = JSON.load(File.read(path)) + @store.read_multi(@store.entries).each do |data| object = data['_type'].camelize.constantize.new(data) objects[object._id] = object end end end @@ -274,19 +279,18 @@ # For each object, map its ID to the ID of its duplicate, if any. # # @param [Hash] objects a hash of scraped objects keyed by ID # @return [Hash] a mapping from an object ID to the ID of its duplicate def build_losers_to_winners_map(objects) + inverse = {} + objects.each do |id,object| + (inverse[object.to_h.except(:_id)] ||= []) << id + end + {}.tap do |map| - # We don't need to iterate on the last item in the hash, but skipping - # the last item is more effort than running the last item. - objects.each_with_index do |(id1,object1),index| - unless map.key?(id1) # Don't search for duplicates of duplicates. - objects.drop(index + 1).each do |id2,object2| - if object1 == object2 - map[id2] = id1 - end - end + inverse.values.each do |ids| + ids.drop(1).each do |id| + map[id] = ids[0] end end end end