lib/pupa/processor.rb in pupa-0.0.7 vs lib/pupa/processor.rb in pupa-0.0.8
- old
+ new
@@ -4,38 +4,45 @@
require 'pupa/processor/client'
require 'pupa/processor/dependency_graph'
require 'pupa/processor/helper'
require 'pupa/processor/persistence'
+require 'pupa/processor/document_store'
require 'pupa/processor/yielder'
+require 'pupa/processor/document_store/file_store'
+require 'pupa/processor/document_store/redis_store'
+
module Pupa
# An abstract processor class from which specific processors inherit.
class Processor
extend Forwardable
include Helper
class_attribute :tasks
self.tasks = []
- attr_reader :report, :client, :options
+ attr_reader :report, :store, :client, :options
def_delegators :@logger, :debug, :info, :warn, :error, :fatal
- # @param [String] output_dir the directory in which to dump JSON documents
- # @param [String] cache_dir the directory in which to cache HTTP responses
+ # @param [String] output_dir the directory or Redis address
+ # (e.g. `redis://localhost:6379`) in which to dump JSON documents
+ # @param [String] cache_dir the directory or Memcached address
+ # (e.g. `memcached://localhost:11211`) in which to cache HTTP responses
# @param [Integer] expires_in the cache's expiration time in seconds
+ # @param [Boolean] validate whether to validate JSON documents
# @param [String] level the log level
# @param [String,IO] logdev the log device
# @param [Hash] options criteria for selecting the methods to run
- def initialize(output_dir, cache_dir: nil, expires_in: 86400, level: 'INFO', logdev: STDOUT, options: {})
- @output_dir = output_dir
- @options = options
- @level = level
- @logger = Logger.new('pupa', level: level, logdev: logdev)
- @client = Client.new(cache_dir: cache_dir, expires_in: expires_in, level: level)
- @report = {}
+ def initialize(output_dir, cache_dir: nil, expires_in: 86400, validate: true, level: 'INFO', logdev: STDOUT, options: {})
+ @store = DocumentStore.new(output_dir)
+ @client = Client.new(cache_dir: cache_dir, expires_in: expires_in, level: level)
+ @logger = Logger.new('pupa', level: level, logdev: logdev)
+ @validate = validate
+ @options = options
+ @report = {}
end
# Retrieves and parses a document with a GET request.
#
# @param [String] url a URL to an HTML document
@@ -211,37 +218,35 @@
#
# @param [Object] object an scraped object
# @raises [Pupa::Errors::DuplicateObjectIdError]
def dump_scraped_object(object)
type = object.class.to_s.demodulize.underscore
- basename = "#{type}_#{object._id.gsub(File::SEPARATOR, '_')}.json"
- path = File.join(@output_dir, basename)
+ name = "#{type}_#{object._id.gsub(File::SEPARATOR, '_')}.json"
- if File.exist?(path)
+ if @store.exist?(name)
raise Errors::DuplicateObjectIdError, "duplicate object ID: #{object._id} (was the same objected yielded twice?)"
end
- info {"save #{type} #{object.to_s} as #{basename}"}
+ info {"save #{type} #{object.to_s} as #{name}"}
- File.open(path, 'w') do |f|
- f.write(JSON.dump(object.to_h(include_foreign_objects: true)))
- end
+ @store.write(name, object.to_h(include_foreign_objects: true))
- begin
- object.validate!
- rescue JSON::Schema::ValidationError => e
- warn {e.message}
+ if @validate
+ begin
+ object.validate!
+ rescue JSON::Schema::ValidationError => e
+ warn {e.message}
+ end
end
end
# Loads scraped objects from disk.
#
# @return [Hash] a hash of scraped objects keyed by ID
def load_scraped_objects
{}.tap do |objects|
- Dir[File.join(@output_dir, '*.json')].each do |path|
- data = JSON.load(File.read(path))
+ @store.read_multi(@store.entries).each do |data|
object = data['_type'].camelize.constantize.new(data)
objects[object._id] = object
end
end
end
@@ -274,19 +279,18 @@
# For each object, map its ID to the ID of its duplicate, if any.
#
# @param [Hash] objects a hash of scraped objects keyed by ID
# @return [Hash] a mapping from an object ID to the ID of its duplicate
def build_losers_to_winners_map(objects)
+ inverse = {}
+ objects.each do |id,object|
+ (inverse[object.to_h.except(:_id)] ||= []) << id
+ end
+
{}.tap do |map|
- # We don't need to iterate on the last item in the hash, but skipping
- # the last item is more effort than running the last item.
- objects.each_with_index do |(id1,object1),index|
- unless map.key?(id1) # Don't search for duplicates of duplicates.
- objects.drop(index + 1).each do |id2,object2|
- if object1 == object2
- map[id2] = id1
- end
- end
+ inverse.values.each do |ids|
+ ids.drop(1).each do |id|
+ map[id] = ids[0]
end
end
end
end