lib/rhosync/source_sync.rb in rhosync-2.1.16 vs lib/rhosync/source_sync.rb in rhosync-2.1.17.beta1
- old
+ new
@@ -79,10 +79,33 @@
SourceJob.queue = queue_name
Resque.enqueue(SourceJob,job_type,@source.id,
@source.app_id,@source.user_id,client_id,params)
end
+ def fast_insert(new_objs, timeout=10,raise_on_expire=false)
+ @source.lock(:md,timeout,raise_on_expire) do |s|
+ diff_count = new_objs.size
+ @source.put_data(:md, new_objs, true)
+ @source.update_count(:md_size,diff_count)
+ end
+ end
+
+ def fast_update(orig_hash, new_hash, timeout=10,raise_on_expire=false)
+ @source.lock(:md,timeout,raise_on_expire) do |s|
+ @source.delete_data(:md, orig_hash)
+ @source.put_data(:md, new_hash, true)
+ end
+ end
+
+ def fast_delete(delete_objs, timeout=10,raise_on_expire=false)
+ @source.lock(:md,timeout,raise_on_expire) do |s|
+ diff_count = -delete_objs.size
+ @source.delete_data(:md, delete_objs)
+ @source.update_count(:md_size,diff_count)
+ end
+ end
+
def push_objects(objects,timeout=10,raise_on_expire=false,rebuild_md=true)
@source.lock(:md,timeout,raise_on_expire) do |s|
diff_count = 0
# in case of rebuild_md
# we clean-up and rebuild the whole :md doc
@@ -257,10 +280,17 @@
def _get_data(method)
if @adapter.respond_to?(method)
data = @adapter.send(method)
if data
@source.put_value(method,data)
- @source.put_value("#{method}_sha1",Digest::SHA1.hexdigest(data))
+ if method == :schema
+ parsed = JSON.parse(data)
+ schema_version = parsed['version']
+ raise "Mandatory version key is not defined in source adapter schema method" if schema_version.nil?
+ @source.put_value("#{method}_sha1",Digest::SHA1.hexdigest(schema_version))
+ else
+ @source.put_value("#{method}_sha1",Digest::SHA1.hexdigest(data))
+ end
end
end
end
# Read Operation; params are query arguments