lib/assimilate/batch.rb in assimilate-0.3.5 vs lib/assimilate/batch.rb in assimilate-0.4.1

- old
+ new

@@ -8,19 +8,26 @@ @domain = args[:domain] @datestamp = args[:datestamp] @idfield = args[:idfield] @filename = args[:filename] + @subset = args[:subset] + @suppress_deletes = args[:nodeletes] + load_baseline @noops = [] @changes = {} @adds = [] @deletes = [] @resolved = false end + def prime(fieldnames) + @fields = fieldnames + end + def load_baseline stored_records = @catalog.catalog.find(@domainkey => @domain, @idfield => {"$exists" => 1}).to_a @baseline = stored_records.each_with_object({}) do |rec, h| key = rec[@idfield] if h.include?(key) @@ -32,11 +39,15 @@ # The stripped record contains only the data values from the source (no internal values with leading underscores). # Any nil values are ignored; these should not be stored but if they do appear in the catalog then don't want # to include them when comparing new records vs. old. def stripped_record_for(key) - @baseline[key] && @baseline[key].select {|k,v| k !~ /^_/ && !v.nil?} + if @subset + @baseline[key] && @baseline[key].select {|k,v| @fields.include?(k)} + else + @baseline[key] && @baseline[key].select {|k,v| k !~ /^_/ && !v.nil?} + end end def <<(record) @seen ||= Hash.new(0) @@ -63,11 +74,11 @@ # * find records that have been deleted def resolve if !@resolved @deleted_keys = (@baseline.keys - @seen.keys).reject {|k| @baseline[k][@catalog.config[:deletion_marker]]} - @updated_field_counts = @changes.each_with_object(Hash.new(0)) do |(key,diffs),h| + @updated_field_counts = @changes.each_with_object(Hash.new(0)) do |(_,diffs),h| # key = rec[idfield] # diffs = deltas(stripped_record_for(key), rec) diffs.keys.each do |f| h[f] += 1 end @@ -115,18 +126,22 @@ 'filename' => @filename }) end def apply_deletes - @deleted_keys.each do |key| - @catalog.catalog.update( - { - @domainkey => domain, - idfield => key - }, - {"$set" => {@catalog.config[:deletion_marker] => datestamp}} - ) + unless @suppress_deletes + @deleted_keys.each do |key| + @catalog.catalog.update( + { + @domainkey => domain, + idfield => key + }, + { + "$set" => {@catalog.config[:deletion_marker] => datestamp} + } + ) + end end end INSERT_BATCH_SIZE = 1000 # default batch size for bulk loading into mongo @@ -143,10 +158,12 @@ @catalog.catalog.update( { @domainkey => domain, idfield => key }, - {"$set" => diffs.merge(marker => datestamp)} + { + "$set" => diffs.merge(marker => datestamp) + } ) end end def decorate(records)