lib/assimilate/batch.rb in assimilate-0.0.2 vs lib/assimilate/batch.rb in assimilate-0.0.3

- old
+ new

@@ -1,10 +1,12 @@ class Assimilate::Batch attr_reader :domain, :idfield, :datestamp def initialize(args) @catalog = args[:catalog] + @domainkey = @catalog.config[:domain] + @domain = args[:domain] @datestamp = args[:datestamp] @idfield = args[:idfield] @filename = args[:filename] @@ -16,15 +18,15 @@ @deletes = [] @resolved = false end def load_baseline - stored_records = @catalog.catalog.find(@catalog.domainkey => @domain).to_a + stored_records = @catalog.catalog.find(@domainkey => @domain).to_a @baseline = stored_records.each_with_object({}) do |rec, h| key = rec[@idfield] if h.include?(key) - raise Assimilate::CorruptDataError, "Duplicate records for key [#{key}] in domain [#{@domain}]" + raise Assimilate::CorruptDataError, "Duplicate records for key [#{key}] in #{@domainkey} [#{@domain}]" end h[key] = rec end end @@ -52,11 +54,11 @@ # compute anything needed before we can write updates to permanent store # * find records that have been deleted def resolve if !@resolved - @deleted_keys = @baseline.keys - @seen.keys + @deleted_keys = (@baseline.keys - @seen.keys).reject {|k| @baseline[k][@catalog.config[:deletion_marker]]} @updated_field_counts = @changes.each_with_object(Hash.new(0)) do |rec,h| key = rec[idfield] diffs = rec.diff(stripped_record_for(key)) diffs.keys.each do |f| @@ -74,10 +76,11 @@ :baseline_count => @baseline.size, :final_count => @baseline.size + @adds.count, :adds_count => @adds.count, :new_ids => @adds.map {|rec| rec[idfield]}, :deletes_count => @deleted_keys.count, + :deleted_ids => @deleted_keys, :updates_count => @changes.count, :unchanged_count => @noops.count, :updated_fields => @updated_field_counts } end @@ -90,28 +93,28 @@ apply_inserts apply_updates end def record_batch - raise(Assimilate::DuplicateImportError, "duplicate batch for datestamp #{datestamp}") if @catalog.batches.find('domain' => @domain, 'datestamp' => @datestamp).to_a.any? - raise(Assimilate::DuplicateImportError, "duplicate batch for file #{@filename}") if @catalog.batches.find('domain' => @domain, 'filename' => @filename).to_a.any? + raise(Assimilate::DuplicateImportError, "duplicate batch for datestamp #{datestamp}") if @catalog.batches.find(@domainkey => @domain, 'datestamp' => @datestamp).to_a.any? + raise(Assimilate::DuplicateImportError, "duplicate batch for file #{@filename}") if @catalog.batches.find(@domainkey => @domain, 'filename' => @filename).to_a.any? @catalog.batches.insert({ - 'domain' => @domain, + @domainkey => @domain, 'datestamp' => @datestamp, 'filename' => @filename }) end def apply_deletes @deleted_keys.each do |key| @catalog.catalog.update( { - @catalog.domainkey => domain, + @domainkey => domain, idfield => key }, - {"$set" => {:_dt_removed => datestamp}} + {"$set" => {@catalog.config[:deletion_marker] => datestamp}} ) end end INSERT_BATCH_SIZE = 1000 # default batch size for bulk loading into mongo @@ -125,20 +128,20 @@ def apply_updates @changes.each do |rec| @catalog.catalog.update( { - @catalog.domainkey => domain, + @domainkey => domain, idfield => rec[idfield] }, {"$set" => rec} ) end end def decorate(records) records.map do |r| - r[@catalog.domainkey] = @domain + r[@domainkey] = @domain r.to_hash end end end