module Rhoconnect class SourceSync attr_reader :adapter def initialize(source) @source = source raise InvalidArgumentError.new('Invalid source') if @source.nil? raise InvalidArgumentError.new('Invalid app for source') unless @source.app @adapter = SourceAdapter.create(@source) end # CUD Operations def create _measure_and_process_cud('create') end def update _measure_and_process_cud('update') end def delete _measure_and_process_cud('delete') end # Pass through CUD to adapter, no data stored def pass_through_cud(cud_params,query_params) return if _auth_op('login') == false res,processed_objects = {},[] begin ['create','update','delete'].each do |op| key,objects = op,cud_params[op] objects.each do |key,value| case op when 'create' @adapter.send(op.to_sym,value) when 'update' value['id'] = key @adapter.send(op.to_sym,value) when 'delete' value['id'] = key @adapter.send(op.to_sym,value) end processed_objects << key end if objects end rescue Exception => e log "Error in pass through method: #{e.message}" res['error'] = {'message' => e.message } end _auth_op('logoff') res['processed'] = processed_objects res.to_json end # Read Operation; params are query arguments def read(client_id=nil,params=nil) _read('query',client_id,params) end def search(client_id=nil,params=nil) return if _auth_op('login',client_id) == false res = _read('search',client_id,params) _auth_op('logoff',client_id) res end def process_cud if @source.cud_queue or @source.queue async(:cud,@source.cud_queue || @source.queue) else do_cud end end def do_cud return if _auth_op('login') == false self.create self.update self.delete _auth_op('logoff') end def process_query(params=nil) if @source.query_queue or @source.queue async(:query,@source.query_queue || @source.queue,params) else do_query(params) end end def do_query(params=nil) result = nil @source.if_need_refresh do Rhoconnect::Stats::Record.update("source:query:#{@source.name}") do if _auth_op('login') result = self.read(nil,params) _auth_op('logoff') end # re-wind refresh time in case of error query_failure = Store.exists?(@source.docname(:errors)) @source.rewind_refresh_time(query_failure) end end result end # Enqueue a job for the source based on job type def async(job_type,queue_name,params=nil) SourceJob.queue = queue_name Resque.enqueue(SourceJob,job_type,@source.id, @source.app_id,@source.user_id,params) end def fast_insert(new_objs, timeout=10,raise_on_expire=false) @source.lock(:md,timeout,raise_on_expire) do |s| diff_count = new_objs.size @source.put_data(:md, new_objs, true) @source.update_count(:md_size,diff_count) end @source.announce_changes end def fast_update(remove_hash, new_hash, timeout=10,raise_on_expire=false) return unless ((remove_hash and remove_hash.size > 0) or (new_hash and new_hash.size > 0)) @source.lock(:md,timeout,raise_on_expire) do |s| # get the objects from DB, remove prev attr data, add new attr data update_keys = Set.new update_keys += Set.new(remove_hash.keys) if remove_hash update_keys += Set.new(new_hash.keys) if new_hash objs_to_update = @source.get_objects(:md, update_keys.to_a) || {} diff_count = -objs_to_update.size # remove old values from DB @source.delete_data(:md, objs_to_update) # update data remove_hash.each do |key, obj| next unless objs_to_update[key] obj.each do |attrib, value| objs_to_update[key].delete(attrib) objs_to_update.delete(key) if objs_to_update[key].empty? end end if remove_hash new_hash.each do |key, obj| objs_to_update[key] ||= {} objs_to_update[key].merge!(obj) end if new_hash # store new data into DB @source.put_data(:md, objs_to_update, true) diff_count += objs_to_update.size @source.update_count(:md_size,diff_count) end @source.announce_changes end def fast_delete(delete_objs, timeout=10,raise_on_expire=false) @source.lock(:md,timeout,raise_on_expire) do |s| diff_count = -delete_objs.size @source.delete_data(:md, delete_objs) @source.update_count(:md_size,diff_count) end @source.announce_changes end def push_objects(objects,timeout=10,raise_on_expire=false,rebuild_md=true) @source.lock(:md,timeout,raise_on_expire) do |s| diff_count = 0 # in case of rebuild_md # we clean-up and rebuild the whole :md doc # on every request if(rebuild_md) doc = @source.get_data(:md) orig_doc_size = doc.size objects.each do |id,obj| doc[id] ||= {} doc[id].merge!(obj) end diff_count = doc.size - orig_doc_size @source.put_data(:md,doc) else # if rebuild_md == false # we only operate on specific set values # which brings a big optimization # in case of small transactions diff_count = @source.update_objects(:md, objects) end @source.update_count(:md_size,diff_count) end @source.announce_changes end def push_deletes(objects,timeout=10,raise_on_expire=false,rebuild_md=true) @source.lock(:md,timeout,raise_on_expire) do |s| diff_count = 0 if(rebuild_md) # in case of rebuild_md # we clean-up and rebuild the whole :md doc # on every request doc = @source.get_data(:md) orig_doc_size = doc.size objects.each do |id| doc.delete(id) end diff_count = doc.size - orig_doc_size @source.put_data(:md,doc) else # if rebuild_md == false # we only operate on specific set values # which brings a big optimization # in case of small transactions diff_count = -@source.remove_objects(:md, objects) end @source.update_count(:md_size,diff_count) end @source.announce_changes end private def _auth_op(operation,client_id=-1) edockey = client_id == -1 ? @source.docname(:errors) : Client.load(client_id,{:source_name => @source.name}).docname(:search_errors) begin Store.flash_data(edockey) if operation == 'login' @adapter.send operation rescue Exception => e log "SourceAdapter raised #{operation} exception: #{e}" log e.backtrace.join("\n") Store.put_data(edockey,{"#{operation}-error"=>{'message'=>e.message}},true) return false end true end def _process_create(modified_recs,key,value) link = @adapter.create value # Store object-id link for the client # If we have a link, store object in client document # Otherwise, store object for delete on client modified_recs.each do |modified_rec| if link modified_rec[:links] ||= {} modified_rec[:links][modified_rec[:key]] = { 'l' => link.to_s } modified_rec[:creates] ||= {} modified_rec[:creates][link.to_s] = value else modified_rec[:deletes] ||= {} modified_rec[:deletes][modified_rec[:key]] = value end end end def _process_update(modified_recs,key,value) # Add id to object hash to forward to backend call value['id'] = key # Perform operation @adapter.update value end def _process_delete(modified_recs,key,value) value['id'] = key @adapter.delete value # Perform operation modified_recs.each do |modified_rec| modified_rec[:dels] ||= {} modified_rec[:dels][modified_rec[:key]] = value end end def _measure_and_process_cud(operation) Rhoconnect::Stats::Record.update("source:#{operation}:#{@source.name}") do _process_cud(operation) end end def _process_cud(operation) # take everything from the queue and erase it # so that no other process will be able to process it again operation_hashes = [] client_ids = [] @source.lock(operation) do |s| operation_hashes,client_ids = s.get_zdata(operation) s.flush_zdata(operation) end invalid_meta = @adapter.validate(operation,operation_hashes,client_ids) errors,links,deletes,creates,dels = {},{},{},{},{} operation_hashes.each_with_index do |client_operation_data,index| client_id = client_ids[index] current_invalid_meta = invalid_meta[index] || {} client_operation_data.each do |key, value| begin continue_loop = true modified_recs = [{:client_id => client_id, :key => key, :value => value }] record_invalid_meta = current_invalid_meta[key] || {} # Remove object from queue client_operation_data.delete(key) # skip the rec - if it is a duplicate of some other record next if record_invalid_meta[:duplicate_of] # prepare duplicate docs duplicates = record_invalid_meta[:duplicates] || {} duplicates.each do |duplicate| modified_recs << duplicate end # raise conflict error if record is marked with one raise SourceAdapterObjectConflictError.new(record_invalid_meta[:error]) if record_invalid_meta[:error] # Call on source adapter to process individual object case operation when 'create' _process_create(modified_recs,key,value) when 'update' _process_update(modified_recs,key,value) when 'delete' _process_delete(modified_recs,key,value) end rescue Exception => e log "SourceAdapter raised #{operation} exception: #{e}" log e.backtrace.join("\n") continue_loop = false modified_recs.each do |modified_rec| modified_rec[:errors] ||= {} modified_rec[:errors][modified_rec[:key]] = modified_rec[:value] modified_rec[:errors]["#{modified_rec[:key]}-error"] = {'message'=>e.message} end end { :errors => errors, :links => links, :deletes => deletes, :creates => creates, :dels => dels }.each do |doc_name, doc| modified_recs.each do |modified_rec| doc[modified_rec[:client_id]] ||= {} next unless modified_rec[doc_name] doc[modified_rec[:client_id]].merge!(modified_rec[doc_name]) end end break unless continue_loop end # Record rest of queue (if something in the middle failed) if not client_operation_data.empty? @source.put_zdata(operation,client_id,client_operation_data,true) end end # now, process all the modified docs processed_clients = {} client_ids.each do |client_id| processed_clients[client_id] = Client.load(client_id,{:source_name => @source.name}) unless processed_clients[client_id] end { "delete_page" => deletes, "#{operation}_links" => links, "#{operation}_errors" => errors }.each do |doctype,client_docs| client_docs.each do |client_id,data| client = processed_clients[client_id] client.put_data(doctype,data,true) unless data.empty? end end if operation == 'create' total_creates = {} creates.each do |client_id,client_doc| next if client_doc.empty? client = processed_clients[client_id] client.put_data(:cd,client_doc,true) client.update_count(:cd_size,client_doc.size) total_creates.merge!(client_doc) end unless total_creates.empty? @source.lock(:md) do |s| s.put_data(:md,total_creates,true) s.update_count(:md_size,total_creates.size) end end end if operation == 'delete' # Clean up deleted objects from master document and corresponding client document total_dels = {} objs = {} dels.each do |client_id,client_doc| next if client_doc.empty? client = processed_clients[client_id] client.delete_data(:cd,client_doc) client.update_count(:cd_size,-client_doc.size) total_dels.merge!(client_doc) end unless total_dels.empty? @source.lock(:md) do |s| s.delete_data(:md,total_dels) s.update_count(:md_size,-total_dels.size) end end end if operation == 'update' errors.each do |client_id, error_doc| next if error_doc.empty? client = processed_clients[client_id] cd = client.get_data(:cd) error_doc.each do |key, value| client.put_data(:update_rollback,{key => cd[key]},true) if cd[key] end end end end # Metadata Operation; source adapter returns json def _get_data(method) if @adapter.respond_to?(method) data = @adapter.send(method) if data @source.put_value(method,data) if method == :schema parsed = JSON.parse(data) schema_version = parsed['version'] raise "Mandatory version key is not defined in source adapter schema method" if schema_version.nil? @source.put_value("#{method}_sha1",Digest::SHA1.hexdigest(schema_version)) else @source.put_value("#{method}_sha1",Digest::SHA1.hexdigest(data)) end end end end # Read Operation; params are query arguments def _read(operation,client_id,params=nil) errordoc = nil result = nil begin if operation == 'search' client = Client.load(client_id,{:source_name => @source.name}) errordoc = client.docname(:search_errors) compute_token(client.docname(:search_token)) result = @adapter.search(params) @adapter.save(client.docname(:search)) unless @source.is_pass_through? else errordoc = @source.docname(:errors) [:metadata,:schema].each do |method| _get_data(method) end result = @adapter.do_query(params) end # operation,sync succeeded, remove errors Store.lock(errordoc) do Store.flash_data(errordoc) end rescue Exception => e # store sync,operation exceptions to be sent to all clients for this source/user log "SourceAdapter raised #{operation} exception: #{e}" log e.backtrace.join("\n") Store.lock(errordoc) do Store.put_data(errordoc,{"#{operation}-error"=>{'message'=>e.message}},true) end end # pass through expects result hash @source.is_pass_through? ? result : true end end end