lib/assimilate/batch.rb in assimilate-0.0.2 vs lib/assimilate/batch.rb in assimilate-0.0.3
- old
+ new
@@ -1,10 +1,12 @@
class Assimilate::Batch
attr_reader :domain, :idfield, :datestamp
def initialize(args)
@catalog = args[:catalog]
+ @domainkey = @catalog.config[:domain]
+
@domain = args[:domain]
@datestamp = args[:datestamp]
@idfield = args[:idfield]
@filename = args[:filename]
@@ -16,15 +18,15 @@
@deletes = []
@resolved = false
end
def load_baseline
- stored_records = @catalog.catalog.find(@catalog.domainkey => @domain).to_a
+ stored_records = @catalog.catalog.find(@domainkey => @domain).to_a
@baseline = stored_records.each_with_object({}) do |rec, h|
key = rec[@idfield]
if h.include?(key)
- raise Assimilate::CorruptDataError, "Duplicate records for key [#{key}] in domain [#{@domain}]"
+ raise Assimilate::CorruptDataError, "Duplicate records for key [#{key}] in #{@domainkey} [#{@domain}]"
end
h[key] = rec
end
end
@@ -52,11 +54,11 @@
# compute anything needed before we can write updates to permanent store
# * find records that have been deleted
def resolve
if !@resolved
- @deleted_keys = @baseline.keys - @seen.keys
+ @deleted_keys = (@baseline.keys - @seen.keys).reject {|k| @baseline[k][@catalog.config[:deletion_marker]]}
@updated_field_counts = @changes.each_with_object(Hash.new(0)) do |rec,h|
key = rec[idfield]
diffs = rec.diff(stripped_record_for(key))
diffs.keys.each do |f|
@@ -74,10 +76,11 @@
:baseline_count => @baseline.size,
:final_count => @baseline.size + @adds.count,
:adds_count => @adds.count,
:new_ids => @adds.map {|rec| rec[idfield]},
:deletes_count => @deleted_keys.count,
+ :deleted_ids => @deleted_keys,
:updates_count => @changes.count,
:unchanged_count => @noops.count,
:updated_fields => @updated_field_counts
}
end
@@ -90,28 +93,28 @@
apply_inserts
apply_updates
end
def record_batch
- raise(Assimilate::DuplicateImportError, "duplicate batch for datestamp #{datestamp}") if @catalog.batches.find('domain' => @domain, 'datestamp' => @datestamp).to_a.any?
- raise(Assimilate::DuplicateImportError, "duplicate batch for file #{@filename}") if @catalog.batches.find('domain' => @domain, 'filename' => @filename).to_a.any?
+ raise(Assimilate::DuplicateImportError, "duplicate batch for datestamp #{datestamp}") if @catalog.batches.find(@domainkey => @domain, 'datestamp' => @datestamp).to_a.any?
+ raise(Assimilate::DuplicateImportError, "duplicate batch for file #{@filename}") if @catalog.batches.find(@domainkey => @domain, 'filename' => @filename).to_a.any?
@catalog.batches.insert({
- 'domain' => @domain,
+ @domainkey => @domain,
'datestamp' => @datestamp,
'filename' => @filename
})
end
def apply_deletes
@deleted_keys.each do |key|
@catalog.catalog.update(
{
- @catalog.domainkey => domain,
+ @domainkey => domain,
idfield => key
},
- {"$set" => {:_dt_removed => datestamp}}
+ {"$set" => {@catalog.config[:deletion_marker] => datestamp}}
)
end
end
INSERT_BATCH_SIZE = 1000 # default batch size for bulk loading into mongo
@@ -125,20 +128,20 @@
def apply_updates
@changes.each do |rec|
@catalog.catalog.update(
{
- @catalog.domainkey => domain,
+ @domainkey => domain,
idfield => rec[idfield]
},
{"$set" => rec}
)
end
end
def decorate(records)
records.map do |r|
- r[@catalog.domainkey] = @domain
+ r[@domainkey] = @domain
r.to_hash
end
end
end