lib/assimilate/batch.rb in assimilate-0.3.5 vs lib/assimilate/batch.rb in assimilate-0.4.1
- old
+ new
@@ -8,19 +8,26 @@
@domain = args[:domain]
@datestamp = args[:datestamp]
@idfield = args[:idfield]
@filename = args[:filename]
+ @subset = args[:subset]
+ @suppress_deletes = args[:nodeletes]
+
load_baseline
@noops = []
@changes = {}
@adds = []
@deletes = []
@resolved = false
end
+ def prime(fieldnames)
+ @fields = fieldnames
+ end
+
def load_baseline
stored_records = @catalog.catalog.find(@domainkey => @domain, @idfield => {"$exists" => 1}).to_a
@baseline = stored_records.each_with_object({}) do |rec, h|
key = rec[@idfield]
if h.include?(key)
@@ -32,11 +39,15 @@
# The stripped record contains only the data values from the source (no internal values with leading underscores).
# Any nil values are ignored; these should not be stored but if they do appear in the catalog then don't want
# to include them when comparing new records vs. old.
def stripped_record_for(key)
- @baseline[key] && @baseline[key].select {|k,v| k !~ /^_/ && !v.nil?}
+ if @subset
+ @baseline[key] && @baseline[key].select {|k,v| @fields.include?(k)}
+ else
+ @baseline[key] && @baseline[key].select {|k,v| k !~ /^_/ && !v.nil?}
+ end
end
def <<(record)
@seen ||= Hash.new(0)
@@ -63,11 +74,11 @@
# * find records that have been deleted
def resolve
if !@resolved
@deleted_keys = (@baseline.keys - @seen.keys).reject {|k| @baseline[k][@catalog.config[:deletion_marker]]}
- @updated_field_counts = @changes.each_with_object(Hash.new(0)) do |(key,diffs),h|
+ @updated_field_counts = @changes.each_with_object(Hash.new(0)) do |(_,diffs),h|
# key = rec[idfield]
# diffs = deltas(stripped_record_for(key), rec)
diffs.keys.each do |f|
h[f] += 1
end
@@ -115,18 +126,22 @@
'filename' => @filename
})
end
def apply_deletes
- @deleted_keys.each do |key|
- @catalog.catalog.update(
- {
- @domainkey => domain,
- idfield => key
- },
- {"$set" => {@catalog.config[:deletion_marker] => datestamp}}
- )
+ unless @suppress_deletes
+ @deleted_keys.each do |key|
+ @catalog.catalog.update(
+ {
+ @domainkey => domain,
+ idfield => key
+ },
+ {
+ "$set" => {@catalog.config[:deletion_marker] => datestamp}
+ }
+ )
+ end
end
end
INSERT_BATCH_SIZE = 1000 # default batch size for bulk loading into mongo
@@ -143,10 +158,12 @@
@catalog.catalog.update(
{
@domainkey => domain,
idfield => key
},
- {"$set" => diffs.merge(marker => datestamp)}
+ {
+ "$set" => diffs.merge(marker => datestamp)
+ }
)
end
end
def decorate(records)