lib/dbox/syncer.rb in dbox-0.5.3 vs lib/dbox/syncer.rb in dbox-0.6.0
- old
+ new
@@ -1,8 +1,9 @@
module Dbox
class Syncer
MAX_PARALLEL_DBOX_OPS = 5
+ MIN_BYTES_TO_STREAM_DOWNLOAD = 1024 * 100 # 100kB
include Loggable
def self.create(remote_path, local_path)
api.create_dir(remote_path)
@@ -35,10 +36,11 @@
@@_api ||= API.connect
end
class Operation
include Loggable
+ include Utils
attr_reader :database
def initialize(database, api)
@database = database
@@ -63,42 +65,10 @@
def remote_path
metadata[:remote_path]
end
- def local_to_relative_path(path)
- if path.include?(local_path)
- path.sub(local_path, "").sub(/^\//, "")
- else
- raise BadPath, "Not a local path: #{path}"
- end
- end
-
- def remote_to_relative_path(path)
- if path.include?(remote_path)
- path.sub(remote_path, "").sub(/^\//, "")
- else
- raise BadPath, "Not a remote path: #{path}"
- end
- end
-
- def relative_to_local_path(path)
- if path && path.length > 0
- File.join(local_path, path)
- else
- local_path
- end
- end
-
- def relative_to_remote_path(path)
- if path && path.length > 0
- File.join(remote_path, path)
- else
- remote_path
- end
- end
-
def remove_dotfiles(contents)
contents.reject {|c| File.basename(c[:path]).start_with?(".") }
end
def current_dir_entries_as_hash(dir)
@@ -114,33 +84,15 @@
def lookup_id_by_path(path)
@_ids ||= {}
@_ids[path] ||= database.find_by_path(path)[:id]
end
- def time_to_s(t)
- case t
- when Time
- # matches dropbox time format
- t.utc.strftime("%a, %d %b %Y %H:%M:%S +0000")
- when String
- t
- end
- end
-
- def parse_time(t)
- case t
- when Time
- t
- when String
- Time.parse(t)
- end
- end
-
def saving_timestamp(path)
mtime = File.mtime(path)
- yield
+ res = yield
File.utime(Time.now, mtime, path)
+ res
end
def saving_parent_timestamp(entry, &proc)
local_path = relative_to_local_path(entry[:path])
parent = File.dirname(local_path)
@@ -150,11 +102,11 @@
def update_file_timestamp(entry)
File.utime(Time.now, entry[:modified], relative_to_local_path(entry[:path]))
end
def gather_remote_info(entry)
- res = api.metadata(relative_to_remote_path(entry[:path]), entry[:hash])
+ res = api.metadata(relative_to_remote_path(entry[:path]), entry[:remote_hash])
case res
when Hash
out = process_basic_remote_props(res)
out[:id] = entry[:id] if entry[:id]
if res[:contents]
@@ -173,15 +125,16 @@
end
end
def process_basic_remote_props(res)
out = {}
- out[:path] = remote_to_relative_path(res[:path])
- out[:modified] = parse_time(res[:modified])
- out[:is_dir] = res[:is_dir]
- out[:hash] = res[:hash] if res[:hash]
- out[:revision] = res[:revision] if res[:revision]
+ out[:path] = remote_to_relative_path(res[:path])
+ out[:modified] = parse_time(res[:modified])
+ out[:is_dir] = res[:is_dir]
+ out[:remote_hash] = res[:hash] if res[:hash]
+ out[:revision] = res[:rev] if res[:rev]
+ out[:size] = res[:bytes] if res[:bytes]
out
end
def generate_tmpfilename(path)
out = File.join(local_path, ".#{path.gsub(/\W/, '-')}.part")
@@ -226,36 +179,46 @@
c[:parent_id] ||= lookup_id_by_path(c[:parent_path])
if c[:is_dir]
# directory creation cannot go in a thread, since later
# operations might depend on the directory being there
create_dir(c)
- database.add_entry(c[:path], true, c[:parent_id], c[:modified], c[:revision], c[:hash])
+ database.add_entry(c[:path], true, c[:parent_id], c[:modified], c[:revision], c[:remote_hash], nil)
changelist[:created] << c[:path]
else
ptasks.add do
begin
- create_file(c)
- database.add_entry(c[:path], false, c[:parent_id], c[:modified], c[:revision], c[:hash])
+ res = create_file(c)
+ local_hash = calculate_hash(relative_to_local_path(c[:path]))
+ database.add_entry(c[:path], false, c[:parent_id], c[:modified], c[:revision], c[:remote_hash], local_hash)
changelist[:created] << c[:path]
+ if res.kind_of?(Array) && res[0] == :conflict
+ changelist[:conflicts] ||= []
+ changelist[:conflicts] << res[1]
+ end
rescue Dbox::ServerError => e
log.error "Error while downloading #{c[:path]}: #{e.inspect}"
parent_ids_of_failed_entries << c[:parent_id]
changelist[:failed] << { :operation => :create, :path => c[:path], :error => e }
end
end
end
when :update
if c[:is_dir]
update_dir(c)
- database.update_entry_by_path(c[:path], :modified => c[:modified], :revision => c[:revision], :hash => c[:hash])
+ database.update_entry_by_path(c[:path], :modified => c[:modified], :revision => c[:revision], :remote_hash => c[:remote_hash])
changelist[:updated] << c[:path]
else
ptasks.add do
begin
- update_file(c)
- database.update_entry_by_path(c[:path], :modified => c[:modified], :revision => c[:revision], :hash => c[:hash])
+ res = update_file(c)
+ local_hash = calculate_hash(relative_to_local_path(c[:path]))
+ database.update_entry_by_path(c[:path], :modified => c[:modified], :revision => c[:revision], :remote_hash => c[:remote_hash], :local_hash => local_hash)
changelist[:updated] << c[:path]
+ if res.kind_of?(Array) && res[0] == :conflict
+ changelist[:conflicts] ||= []
+ changelist[:conflicts] << res[1]
+ end
rescue Dbox::ServerError => e
log.error "Error while downloading #{c[:path]}: #{e.inspect}"
parent_ids_of_failed_entries << c[:parent_id]
changelist[:failed] << { :operation => :create, :path => c[:path], :error => e }
end
@@ -274,15 +237,15 @@
ptasks.finish
# clear hashes on any dirs with children that failed so that
# they are processed again on next pull
parent_ids_of_failed_entries.uniq.each do |id|
- database.update_entry_by_id(id, :hash => nil)
+ database.update_entry_by_id(id, :remote_hash => nil)
end
# sort & return output
- changelist.keys.each {|k| changelist[k].sort! }
+ changelist.keys.each {|k| k == :conflicts ? changelist[k].sort! {|c1, c2| c1[:original] <=> c2[:original] } : changelist[k].sort! }
changelist
end
def calculate_changes(dir, operation = :update)
raise(ArgumentError, "Not a directory: #{dir.inspect}") unless dir[:is_dir]
@@ -315,11 +278,11 @@
if entry = existing_entries[c[:path]]
c[:id] = entry[:id]
c[:modified] = parse_time(c[:modified])
if c[:is_dir]
# queue dir for later
- c[:hash] = entry[:hash]
+ c[:remote_hash] = entry[:remote_hash]
recur_dirs << [:update, c]
else
# update iff modified
out << [:update, c] if modified?(entry, c)
end
@@ -353,13 +316,13 @@
out
end
def modified?(entry, res)
out = (entry[:revision] != res[:revision]) ||
- (time_to_s(entry[:modified]) != time_to_s(res[:modified]))
- out ||= (entry[:hash] != res[:hash]) if res.has_key?(:hash)
- log.debug "#{entry[:path]} modified? r#{entry[:revision]} vs. r#{res[:revision]}, h#{entry[:hash]} vs. h#{res[:hash]}, t#{time_to_s(entry[:modified])} vs. t#{time_to_s(res[:modified])} => #{out}"
+ !times_equal?(entry[:modified], res[:modified])
+ out ||= (entry[:remote_hash] != res[:remote_hash]) if res.has_key?(:remote_hash)
+ log.debug "#{entry[:path]} modified? r#{entry[:revision]} vs. r#{res[:revision]}, h#{entry[:remote_hash]} vs. h#{res[:remote_hash]}, t#{time_to_s(entry[:modified])} vs. t#{time_to_s(res[:modified])} => #{out}"
out
end
def create_dir(dir)
local_path = relative_to_local_path(dir[:path])
@@ -402,19 +365,47 @@
def download_file(file)
local_path = relative_to_local_path(file[:path])
remote_path = relative_to_remote_path(file[:path])
- # stream download to temp file, then atomic move to real path
+ # check to ensure we aren't overwriting an untracked file or a
+ # file with local modifications
+ clobbering = false
+ if entry = database.find_by_path(file[:path])
+ clobbering = calculate_hash(local_path) != entry[:local_hash]
+ else
+ clobbering = File.exists?(local_path)
+ end
+
+ # stream files larger than the minimum
+ stream = file[:size] && file[:size] > MIN_BYTES_TO_STREAM_DOWNLOAD
+
+ # download to temp file
tmp = generate_tmpfilename(file[:path])
File.open(tmp, "w") do |f|
- api.get_file(remote_path, f)
+ api.get_file(remote_path, f, stream)
end
- FileUtils.mv(tmp, local_path)
+ # rename old file if clobbering
+ if clobbering && File.exists?(local_path)
+ backup_path = find_nonconflicting_path(local_path)
+ FileUtils.mv(local_path, backup_path)
+ backup_relpath = local_to_relative_path(backup_path)
+ log.warn "#{file[:path]} had a conflict and the existing copy was renamed to #{backup_relpath} locally"
+ end
+
+ # atomic move over to the real file, and update the timestamp
+ FileUtils.mv(tmp, local_path)
update_file_timestamp(file)
+
+ if backup_relpath
+ [:conflict, { :original => file[:path], :renamed => backup_relpath }]
+ else
+ true
+ end
end
+
end
class Push < Operation
def initialize(database, api)
super(database, api)
@@ -443,21 +434,28 @@
if c[:is_dir]
# directory creation cannot go in a thread, since later
# operations might depend on the directory being there
create_dir(c)
- database.add_entry(c[:path], true, c[:parent_id], nil, nil, nil)
+ database.add_entry(c[:path], true, c[:parent_id], nil, nil, nil, nil)
force_metadata_update_from_server(c)
changelist[:created] << c[:path]
else
# spin up a thread to upload the file
ptasks.add do
begin
- upload_file(c)
- database.add_entry(c[:path], false, c[:parent_id], nil, nil, nil)
- force_metadata_update_from_server(c)
- changelist[:created] << c[:path]
+ local_hash = calculate_hash(relative_to_local_path(c[:path]))
+ res = upload_file(c)
+ database.add_entry(c[:path], false, c[:parent_id], nil, nil, nil, local_hash)
+ if c[:path] == res[:path]
+ force_metadata_update_from_server(c)
+ changelist[:created] << c[:path]
+ else
+ log.warn "#{c[:path]} had a conflict and was renamed to #{res[:path]} on the server"
+ changelist[:conflicts] ||= []
+ changelist[:conflicts] << { :original => c[:path], :renamed => res[:path] }
+ end
rescue Dbox::ServerError => e
log.error "Error while uploading #{c[:path]}: #{e.inspect}"
changelist[:failed] << { :operation => :create, :path => c[:path], :error => e }
end
end
@@ -472,13 +470,21 @@
if !c[:is_dir]
# spin up a thread to upload the file
ptasks.add do
begin
- upload_file(c)
- force_metadata_update_from_server(c)
- changelist[:updated] << c[:path]
+ local_hash = calculate_hash(relative_to_local_path(c[:path]))
+ res = upload_file(c)
+ database.update_entry_by_path(c[:path], :local_hash => local_hash)
+ if c[:path] == res[:path]
+ force_metadata_update_from_server(c)
+ changelist[:updated] << c[:path]
+ else
+ log.warn "#{c[:path]} had a conflict and was renamed to #{res[:path]} on the server"
+ changelist[:conflicts] ||= []
+ changelist[:conflicts] << { :original => c[:path], :renamed => res[:path] }
+ end
rescue Dbox::ServerError => e
log.error "Error while uploading #{c[:path]}: #{e.inspect}"
changelist[:failed] << { :operation => :update, :path => c[:path], :error => e }
end
end
@@ -524,11 +530,11 @@
existing_entries = current_dir_entries_as_hash(dir)
child_paths = list_contents(dir).sort
child_paths.each do |p|
- c = { :path => p, :modified => mtime(p), :is_dir => is_dir(p), :parent_path => dir[:path] }
+ c = { :path => p, :modified => mtime(p), :is_dir => is_dir(p), :parent_path => dir[:path], :local_hash => calculate_hash(relative_to_local_path(p)) }
if entry = existing_entries[p]
c[:id] = entry[:id]
recur_dirs << c if c[:is_dir] # queue dir for later
out << [:update, c] if modified?(entry, c) # update iff modified
else
@@ -558,12 +564,20 @@
def is_dir(path)
File.directory?(relative_to_local_path(path))
end
def modified?(entry, res)
- out = time_to_s(entry[:modified]) != time_to_s(res[:modified])
- log.debug "#{entry[:path]} modified? t#{time_to_s(entry[:modified])} vs. t#{time_to_s(res[:modified])} => #{out}"
+ out = true
+ if entry[:is_dir]
+ out = !times_equal?(entry[:modified], res[:modified])
+ log.debug "#{entry[:path]} modified? t#{time_to_s(entry[:modified])} vs. t#{time_to_s(res[:modified])} => #{out}"
+ else
+ eh = entry[:local_hash]
+ rh = res[:local_hash]
+ out = !(eh && rh && eh == rh)
+ log.debug "#{entry[:path]} modified? #{eh} vs. #{rh} => #{out}"
+ end
out
end
def list_contents(dir)
local_path = relative_to_local_path(dir[:path])
@@ -589,17 +603,20 @@
def upload_file(file)
local_path = relative_to_local_path(file[:path])
remote_path = relative_to_remote_path(file[:path])
File.open(local_path) do |f|
- api.put_file(remote_path, f)
+ db_entry = database.find_by_path(file[:path])
+ last_revision = db_entry ? db_entry[:revision] : nil
+ res = api.put_file(remote_path, f, last_revision)
+ process_basic_remote_props(res)
end
end
def force_metadata_update_from_server(entry)
res = gather_remote_info(entry)
unless res == :not_modified
- database.update_entry_by_path(entry[:path], :modified => res[:modified], :revision => res[:revision], :hash => res[:hash])
+ database.update_entry_by_path(entry[:path], :modified => res[:modified], :revision => res[:revision], :remote_hash => res[:remote_hash])
end
update_file_timestamp(database.find_by_path(entry[:path]))
end
end
end