# general class to handle comparing and pushing data to the remote end class UpdateAgent MAX_HASHES_AT_A_TIME = 100 MAX_TO_BATCH_AT_A_TIME = 10 SLEEP_PERIOD = 3 DEBUG = false def initialize(data=nil, options={}) @options = options @attributes = [] @data = [] @create = [] @update = [] if data if data.is_a?(Array) @data = data @attributes = data.first.keys.sort else read_from_file(data) end end check_for_invalid_columns end def check_for_invalid_columns if invalid = @data.detect { |row| row['id'] } puts "Error: one or more records contain an 'id' column." puts "You must utilize 'legacy_id' rather than 'id' so that" puts "identity and foreign keys are maintained from your" puts "existing membership management database." exit end end # load data from csv file and do some type conversion for bools and dates # first row must be attribute names def read_from_file(filename) csv = FasterCSV.open(filename, 'r') @attributes = csv.shift record_count = 0 @data = csv.map do |row| hash = {} row.each_with_index do |value, index| key = @attributes[index] next if IGNORE_ATTRIBUTES.include?(key) if DATETIME_ATTRIBUTES.include?(key) if value.blank? value = nil else begin value = DateTime.parse(value) rescue ArgumentError puts "Invalid date in #{filename} record #{index} (#{key}) - #{value}" exit(1) end end elsif BOOLEAN_ATTRIBUTES.include?(key) if value == '' or value == nil value = nil elsif %w(no false 0).include?(value.downcase) value = false else value = true end elsif INTEGER_ATTRIBUTES.include?(key) value = value.to_s != '' ? value.scan(/\d/).join.to_i : nil end hash[key] = value end record_count += 1 print "reading record #{record_count}\r" hash['deleted'] = false hash end puts @attributes << 'deleted' @attributes.reject! { |a| IGNORE_ATTRIBUTES.include?(a) } end def ids @data.map { |r| r['id'] }.compact end def legacy_ids @data.map { |r| r['legacy_id'] }.compact end def compare(force=false) compare_hashes(legacy_ids, force) end def has_work? (@create + @update).any? end def present puts "The following #{resource.name.downcase} records will be pushed..." puts 'legacy id name' puts '---------- -------------------------------------' @create.each { |r| present_record(r, true) } @update.each { |r| present_record(r) } puts end def present_record(row, new=false) puts "#{row['legacy_id'].to_s.ljust(10)} #{name_for(row).to_s.ljust(40)} #{new ? '(new)' : ' '}" if DEBUG puts row.values_hash(@attributes) puts row['remote_hash'] end end def confirm agree('Do you want to continue, pushing these records to OneBody? ') end # use ActiveResource to create/update records on remote end def push @errors = [] puts 'Updating remote end...' index = 0 print "#{resource.name} 0/0\r"; STDOUT.flush (@create + @update).each_slice(MAX_TO_BATCH_AT_A_TIME) do |records| response = resource.post(:batch, {}, records.to_xml) statuses = Hash.from_xml(response.body)['records'] statuses.each do |status| record = data_by_id[status['legacy_id']] record['id'] = status['id'] if status['status'] == 'error' puts "#{status['legacy_id']}: #{status['error']}" @errors << {:record => record, :error => status['error']} record['error_messages'] = status['error'] end end index += records.length print "#{resource.name} #{index}/#{@create.length + @update.length}\r"; STDOUT.flush sleep SLEEP_PERIOD end puts end def errors (@create + @update).select { |r| r['error_messages'] } end def send_notification if n = @options['notifications'] puts 'Sending notification...' if @errors and @errors.any? subject = 'OneBody UpdateAgent Errors' body = "There were #{@errors.length} error(s) running UpdateAgent.\n\nPlease visit #{ONEBODY_SITE}/admin/syncs for details." else subject = 'OneBody UpdateAgent Success' body = "OneBody UpdateAgent completed without any errors.\n" end Net::SMTP.start(n['host'], n['port'].to_i) do |smtp| smtp.send_message( "From: #{n['from_email']}\nTo: #{n['to_email']}\nSubject: #{subject}\n\n#{body}", n['from_email'], n['to_email'] ) end end end def start_sync @sync = Sync.create(:complete => false) end def finish_sync(create_items=true, mark_complete=true) if create_items @sync.post( :create_items, {}, ( @create.map { |r| to_sync_item(r, 'create') } + @update.map { |r| to_sync_item(r, 'update') } ).to_xml ) end if mark_complete @sync.complete = true @sync.error_count = errors.length @sync.success_count = (@create + @update).length - errors.length @sync.save end end def to_sync_item(record, operation) h = { :syncable_type => self.resource.name, :syncable_id => record['id'], :name => name_for(record), :legacy_id => record['legacy_id'], :operation => operation, :status => record['error_messages'] ? 'error' : 'success' } if record['error_messages'] h[:error_messages] = record['error_messages'].split('; ') end return h end def data_by_id @data_by_id ||= begin by_id = {} @data.each { |r| by_id[r['legacy_id'].to_i] = r } by_id end end attr_accessor :attributes, :data, :sync attr_reader :update, :create class << self; attr_accessor :resource; end def resource; self.class.resource; end protected # ask remote end for value hashe for each record (50 at a time) # mark records to create or update based on response def compare_hashes(ids, force=false) ids.each_slice(MAX_HASHES_AT_A_TIME) do |some_ids| print '.'; STDOUT.flush options = {:attrs => @attributes.join(','), :legacy_id => some_ids.join(',')} options.merge!(:debug => true) if DEBUG response = resource.post(:hashify, {}, options.to_xml) hashes = Hash.from_xml(response.body)['records'].to_a hashes.each do |record| row = data_by_id[record['legacy_id'].to_i] if DEBUG row['remote_hash'] = record['hash'] @update << row if force or row.values_for_hash(@attributes).join != record['hash'] or (resource.name == 'Person' and record['family_id'].nil?) else @update << row if force or row.values_hash(@attributes) != record['hash'] or (resource.name == 'Person' and record['family_id'].nil?) end end @create += some_ids.reject { |id| hashes.map { |h| h['legacy_id'].to_i }.include?(id.to_i) }.map { |id| data_by_id[id.to_i] } sleep SLEEP_PERIOD end puts end end