lib/pupa/processor.rb in pupa-0.0.8 vs lib/pupa/processor.rb in pupa-0.0.9
- old
+ new
@@ -1,9 +1,5 @@
-require 'json'
-
-require 'nokogiri'
-
require 'pupa/processor/client'
require 'pupa/processor/dependency_graph'
require 'pupa/processor/helper'
require 'pupa/processor/persistence'
require 'pupa/processor/document_store'
@@ -28,16 +24,17 @@
# @param [String] output_dir the directory or Redis address
# (e.g. `redis://localhost:6379`) in which to dump JSON documents
# @param [String] cache_dir the directory or Memcached address
# (e.g. `memcached://localhost:11211`) in which to cache HTTP responses
# @param [Integer] expires_in the cache's expiration time in seconds
+ # @param [Boolean] pipelined whether to dump JSON documents all at once
# @param [Boolean] validate whether to validate JSON documents
# @param [String] level the log level
# @param [String,IO] logdev the log device
# @param [Hash] options criteria for selecting the methods to run
- def initialize(output_dir, cache_dir: nil, expires_in: 86400, validate: true, level: 'INFO', logdev: STDOUT, options: {})
- @store = DocumentStore.new(output_dir)
+ def initialize(output_dir, cache_dir: nil, expires_in: 86400, pipelined: false, validate: true, level: 'INFO', logdev: STDOUT, options: {})
+ @store = DocumentStore.new(output_dir, pipelined: pipelined)
@client = Client.new(cache_dir: cache_dir, expires_in: expires_in, level: level)
@logger = Logger.new('pupa', level: level, logdev: logdev)
@validate = validate
@options = options
@report = {}
@@ -71,10 +68,19 @@
# @return a parsed document
def post(url, params = {})
client.post(url, params).body
end
+ # Yields the object to the transformation task for processing, e.g. saving
+ # to disk, printing to CSV, etc.
+ #
+ # @param [Object] an object
+ # @note All the good terms are taken by Ruby: `return`, `send` and `yield`.
+ def dispatch(object)
+ Fiber.yield(object)
+ end
+
# Adds a scraping task to Pupa.rb.
#
# Defines a method whose name is identical to `task_name`. This method
# selects a method to perform the scraping task using `scraping_task_method`
# and memoizes its return value. The return value is a lazy enumerator of
@@ -111,13 +117,15 @@
#
# @param [Symbol] task_name the name of the scraping task to perform
# @return [Integer] the number of scraped objects
def dump_scraped_objects(task_name)
count = 0
- send(task_name).each do |object|
- count += 1 # we don't know the size of the enumeration
- dump_scraped_object(object)
+ @store.pipelined do
+ send(task_name).each do |object|
+ count += 1 # we don't know the size of the enumeration
+ dump_scraped_object(object)
+ end
end
count
end
# Saves scraped objects to a database.
@@ -180,11 +188,11 @@
break if objects.empty? || !progress_made
end
unless objects.empty?
- raise Errors::UnprocessableEntity, "couldn't resolve #{objects.size}/#{size} objects:\n #{objects.values.map{|object| JSON.dump(object.foreign_properties)}.join("\n ")}"
+ raise Errors::UnprocessableEntity, "couldn't resolve #{objects.size}/#{size} objects:\n #{objects.values.map{|object| MultiJson.dump(object.foreign_properties)}.join("\n ")}"
end
end
# Ensure that fingerprints uniquely identified objects.
counts = {}
@@ -220,16 +228,14 @@
# @raises [Pupa::Errors::DuplicateObjectIdError]
def dump_scraped_object(object)
type = object.class.to_s.demodulize.underscore
name = "#{type}_#{object._id.gsub(File::SEPARATOR, '_')}.json"
- if @store.exist?(name)
+ if @store.write_unless_exists(name, object.to_h)
+ info {"save #{type} #{object.to_s} as #{name}"}
+ else
raise Errors::DuplicateObjectIdError, "duplicate object ID: #{object._id} (was the same objected yielded twice?)"
end
-
- info {"save #{type} #{object.to_s} as #{name}"}
-
- @store.write(name, object.to_h(include_foreign_objects: true))
if @validate
begin
object.validate!
rescue JSON::Schema::ValidationError => e