lib/pupa/processor.rb in pupa-0.0.8 vs lib/pupa/processor.rb in pupa-0.0.9

- old
+ new

@@ -1,9 +1,5 @@ -require 'json' - -require 'nokogiri' - require 'pupa/processor/client' require 'pupa/processor/dependency_graph' require 'pupa/processor/helper' require 'pupa/processor/persistence' require 'pupa/processor/document_store' @@ -28,16 +24,17 @@ # @param [String] output_dir the directory or Redis address # (e.g. `redis://localhost:6379`) in which to dump JSON documents # @param [String] cache_dir the directory or Memcached address # (e.g. `memcached://localhost:11211`) in which to cache HTTP responses # @param [Integer] expires_in the cache's expiration time in seconds + # @param [Boolean] pipelined whether to dump JSON documents all at once # @param [Boolean] validate whether to validate JSON documents # @param [String] level the log level # @param [String,IO] logdev the log device # @param [Hash] options criteria for selecting the methods to run - def initialize(output_dir, cache_dir: nil, expires_in: 86400, validate: true, level: 'INFO', logdev: STDOUT, options: {}) - @store = DocumentStore.new(output_dir) + def initialize(output_dir, cache_dir: nil, expires_in: 86400, pipelined: false, validate: true, level: 'INFO', logdev: STDOUT, options: {}) + @store = DocumentStore.new(output_dir, pipelined: pipelined) @client = Client.new(cache_dir: cache_dir, expires_in: expires_in, level: level) @logger = Logger.new('pupa', level: level, logdev: logdev) @validate = validate @options = options @report = {} @@ -71,10 +68,19 @@ # @return a parsed document def post(url, params = {}) client.post(url, params).body end + # Yields the object to the transformation task for processing, e.g. saving + # to disk, printing to CSV, etc. + # + # @param [Object] an object + # @note All the good terms are taken by Ruby: `return`, `send` and `yield`. + def dispatch(object) + Fiber.yield(object) + end + # Adds a scraping task to Pupa.rb. # # Defines a method whose name is identical to `task_name`. This method # selects a method to perform the scraping task using `scraping_task_method` # and memoizes its return value. The return value is a lazy enumerator of @@ -111,13 +117,15 @@ # # @param [Symbol] task_name the name of the scraping task to perform # @return [Integer] the number of scraped objects def dump_scraped_objects(task_name) count = 0 - send(task_name).each do |object| - count += 1 # we don't know the size of the enumeration - dump_scraped_object(object) + @store.pipelined do + send(task_name).each do |object| + count += 1 # we don't know the size of the enumeration + dump_scraped_object(object) + end end count end # Saves scraped objects to a database. @@ -180,11 +188,11 @@ break if objects.empty? || !progress_made end unless objects.empty? - raise Errors::UnprocessableEntity, "couldn't resolve #{objects.size}/#{size} objects:\n #{objects.values.map{|object| JSON.dump(object.foreign_properties)}.join("\n ")}" + raise Errors::UnprocessableEntity, "couldn't resolve #{objects.size}/#{size} objects:\n #{objects.values.map{|object| MultiJson.dump(object.foreign_properties)}.join("\n ")}" end end # Ensure that fingerprints uniquely identified objects. counts = {} @@ -220,16 +228,14 @@ # @raises [Pupa::Errors::DuplicateObjectIdError] def dump_scraped_object(object) type = object.class.to_s.demodulize.underscore name = "#{type}_#{object._id.gsub(File::SEPARATOR, '_')}.json" - if @store.exist?(name) + if @store.write_unless_exists(name, object.to_h) + info {"save #{type} #{object.to_s} as #{name}"} + else raise Errors::DuplicateObjectIdError, "duplicate object ID: #{object._id} (was the same objected yielded twice?)" end - - info {"save #{type} #{object.to_s} as #{name}"} - - @store.write(name, object.to_h(include_foreign_objects: true)) if @validate begin object.validate! rescue JSON::Schema::ValidationError => e