Sha256: ac70b3b5be04e35b3a0a86f96cad2ecaf15028b667edd81a02c3b60b5a53596e
Contents?: true
Size: 1.83 KB
Versions: 1
Compression:
Stored size: 1.83 KB
Contents
require 'page_by_page/enum' require 'page_by_page/mutex_enum' require 'page_by_page/common' require 'erb' module PageByPage class Fetch include Common def initialize(opt = {}, &block) @from, @step, @to = 1, 1, Float::INFINITY super @enum = (defined?(@threads) ? MutexEnum : Enum).new(enum_options) end def url tmpl @tmpl = ERB.new tmpl end def from n @from = n end def step n @step = n end def threads n @threads = n end def process nodes_2d = defined?(@threads) ? parallel_fetch : _fetch puts if @progress nodes_2d.reject(&:nil?).flatten end def iterator Enumerator.new do |yielder| items_enum.each do |_, items| items.each do |i| yielder.yield(i) end end end end protected def _fetch pages = [] items_enum.each do |page_num, items| pages[page_num] = items end pages end def items_enum Enumerator.new do |yielder| items = [nil] catch :no_more do until items.empty? n = @enum.next break if n > limit url = @tmpl.result binding doc = parse url items = doc.css @selector yielder.yield(n, items) update_progress Thread.current, n if @progress sleep @interval if @interval end end end end def parallel_fetch ts = @threads.times.map do |n| Thread.new do Thread.current[:sub] = _fetch end end ts.each_with_object([]) do |t, pages| t.join t[:sub].each_with_index do |items, i| pages[i] = items if items end end end def enum_options {from: @from, step: @step} end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
page_by_page-0.1.13 | lib/page_by_page/fetch.rb |