lib/rhosync/source_sync.rb in rhosync-2.1.16 vs lib/rhosync/source_sync.rb in rhosync-2.1.17.beta1

- old
+ new

@@ -79,10 +79,33 @@ SourceJob.queue = queue_name Resque.enqueue(SourceJob,job_type,@source.id, @source.app_id,@source.user_id,client_id,params) end + def fast_insert(new_objs, timeout=10,raise_on_expire=false) + @source.lock(:md,timeout,raise_on_expire) do |s| + diff_count = new_objs.size + @source.put_data(:md, new_objs, true) + @source.update_count(:md_size,diff_count) + end + end + + def fast_update(orig_hash, new_hash, timeout=10,raise_on_expire=false) + @source.lock(:md,timeout,raise_on_expire) do |s| + @source.delete_data(:md, orig_hash) + @source.put_data(:md, new_hash, true) + end + end + + def fast_delete(delete_objs, timeout=10,raise_on_expire=false) + @source.lock(:md,timeout,raise_on_expire) do |s| + diff_count = -delete_objs.size + @source.delete_data(:md, delete_objs) + @source.update_count(:md_size,diff_count) + end + end + def push_objects(objects,timeout=10,raise_on_expire=false,rebuild_md=true) @source.lock(:md,timeout,raise_on_expire) do |s| diff_count = 0 # in case of rebuild_md # we clean-up and rebuild the whole :md doc @@ -257,10 +280,17 @@ def _get_data(method) if @adapter.respond_to?(method) data = @adapter.send(method) if data @source.put_value(method,data) - @source.put_value("#{method}_sha1",Digest::SHA1.hexdigest(data)) + if method == :schema + parsed = JSON.parse(data) + schema_version = parsed['version'] + raise "Mandatory version key is not defined in source adapter schema method" if schema_version.nil? + @source.put_value("#{method}_sha1",Digest::SHA1.hexdigest(schema_version)) + else + @source.put_value("#{method}_sha1",Digest::SHA1.hexdigest(data)) + end end end end # Read Operation; params are query arguments