lib/dbox/syncer.rb in dbox-0.5.3 vs lib/dbox/syncer.rb in dbox-0.6.0

- old
+ new

@@ -1,8 +1,9 @@ module Dbox class Syncer MAX_PARALLEL_DBOX_OPS = 5 + MIN_BYTES_TO_STREAM_DOWNLOAD = 1024 * 100 # 100kB include Loggable def self.create(remote_path, local_path) api.create_dir(remote_path) @@ -35,10 +36,11 @@ @@_api ||= API.connect end class Operation include Loggable + include Utils attr_reader :database def initialize(database, api) @database = database @@ -63,42 +65,10 @@ def remote_path metadata[:remote_path] end - def local_to_relative_path(path) - if path.include?(local_path) - path.sub(local_path, "").sub(/^\//, "") - else - raise BadPath, "Not a local path: #{path}" - end - end - - def remote_to_relative_path(path) - if path.include?(remote_path) - path.sub(remote_path, "").sub(/^\//, "") - else - raise BadPath, "Not a remote path: #{path}" - end - end - - def relative_to_local_path(path) - if path && path.length > 0 - File.join(local_path, path) - else - local_path - end - end - - def relative_to_remote_path(path) - if path && path.length > 0 - File.join(remote_path, path) - else - remote_path - end - end - def remove_dotfiles(contents) contents.reject {|c| File.basename(c[:path]).start_with?(".") } end def current_dir_entries_as_hash(dir) @@ -114,33 +84,15 @@ def lookup_id_by_path(path) @_ids ||= {} @_ids[path] ||= database.find_by_path(path)[:id] end - def time_to_s(t) - case t - when Time - # matches dropbox time format - t.utc.strftime("%a, %d %b %Y %H:%M:%S +0000") - when String - t - end - end - - def parse_time(t) - case t - when Time - t - when String - Time.parse(t) - end - end - def saving_timestamp(path) mtime = File.mtime(path) - yield + res = yield File.utime(Time.now, mtime, path) + res end def saving_parent_timestamp(entry, &proc) local_path = relative_to_local_path(entry[:path]) parent = File.dirname(local_path) @@ -150,11 +102,11 @@ def update_file_timestamp(entry) File.utime(Time.now, entry[:modified], relative_to_local_path(entry[:path])) end def gather_remote_info(entry) - res = api.metadata(relative_to_remote_path(entry[:path]), entry[:hash]) + res = api.metadata(relative_to_remote_path(entry[:path]), entry[:remote_hash]) case res when Hash out = process_basic_remote_props(res) out[:id] = entry[:id] if entry[:id] if res[:contents] @@ -173,15 +125,16 @@ end end def process_basic_remote_props(res) out = {} - out[:path] = remote_to_relative_path(res[:path]) - out[:modified] = parse_time(res[:modified]) - out[:is_dir] = res[:is_dir] - out[:hash] = res[:hash] if res[:hash] - out[:revision] = res[:revision] if res[:revision] + out[:path] = remote_to_relative_path(res[:path]) + out[:modified] = parse_time(res[:modified]) + out[:is_dir] = res[:is_dir] + out[:remote_hash] = res[:hash] if res[:hash] + out[:revision] = res[:rev] if res[:rev] + out[:size] = res[:bytes] if res[:bytes] out end def generate_tmpfilename(path) out = File.join(local_path, ".#{path.gsub(/\W/, '-')}.part") @@ -226,36 +179,46 @@ c[:parent_id] ||= lookup_id_by_path(c[:parent_path]) if c[:is_dir] # directory creation cannot go in a thread, since later # operations might depend on the directory being there create_dir(c) - database.add_entry(c[:path], true, c[:parent_id], c[:modified], c[:revision], c[:hash]) + database.add_entry(c[:path], true, c[:parent_id], c[:modified], c[:revision], c[:remote_hash], nil) changelist[:created] << c[:path] else ptasks.add do begin - create_file(c) - database.add_entry(c[:path], false, c[:parent_id], c[:modified], c[:revision], c[:hash]) + res = create_file(c) + local_hash = calculate_hash(relative_to_local_path(c[:path])) + database.add_entry(c[:path], false, c[:parent_id], c[:modified], c[:revision], c[:remote_hash], local_hash) changelist[:created] << c[:path] + if res.kind_of?(Array) && res[0] == :conflict + changelist[:conflicts] ||= [] + changelist[:conflicts] << res[1] + end rescue Dbox::ServerError => e log.error "Error while downloading #{c[:path]}: #{e.inspect}" parent_ids_of_failed_entries << c[:parent_id] changelist[:failed] << { :operation => :create, :path => c[:path], :error => e } end end end when :update if c[:is_dir] update_dir(c) - database.update_entry_by_path(c[:path], :modified => c[:modified], :revision => c[:revision], :hash => c[:hash]) + database.update_entry_by_path(c[:path], :modified => c[:modified], :revision => c[:revision], :remote_hash => c[:remote_hash]) changelist[:updated] << c[:path] else ptasks.add do begin - update_file(c) - database.update_entry_by_path(c[:path], :modified => c[:modified], :revision => c[:revision], :hash => c[:hash]) + res = update_file(c) + local_hash = calculate_hash(relative_to_local_path(c[:path])) + database.update_entry_by_path(c[:path], :modified => c[:modified], :revision => c[:revision], :remote_hash => c[:remote_hash], :local_hash => local_hash) changelist[:updated] << c[:path] + if res.kind_of?(Array) && res[0] == :conflict + changelist[:conflicts] ||= [] + changelist[:conflicts] << res[1] + end rescue Dbox::ServerError => e log.error "Error while downloading #{c[:path]}: #{e.inspect}" parent_ids_of_failed_entries << c[:parent_id] changelist[:failed] << { :operation => :create, :path => c[:path], :error => e } end @@ -274,15 +237,15 @@ ptasks.finish # clear hashes on any dirs with children that failed so that # they are processed again on next pull parent_ids_of_failed_entries.uniq.each do |id| - database.update_entry_by_id(id, :hash => nil) + database.update_entry_by_id(id, :remote_hash => nil) end # sort & return output - changelist.keys.each {|k| changelist[k].sort! } + changelist.keys.each {|k| k == :conflicts ? changelist[k].sort! {|c1, c2| c1[:original] <=> c2[:original] } : changelist[k].sort! } changelist end def calculate_changes(dir, operation = :update) raise(ArgumentError, "Not a directory: #{dir.inspect}") unless dir[:is_dir] @@ -315,11 +278,11 @@ if entry = existing_entries[c[:path]] c[:id] = entry[:id] c[:modified] = parse_time(c[:modified]) if c[:is_dir] # queue dir for later - c[:hash] = entry[:hash] + c[:remote_hash] = entry[:remote_hash] recur_dirs << [:update, c] else # update iff modified out << [:update, c] if modified?(entry, c) end @@ -353,13 +316,13 @@ out end def modified?(entry, res) out = (entry[:revision] != res[:revision]) || - (time_to_s(entry[:modified]) != time_to_s(res[:modified])) - out ||= (entry[:hash] != res[:hash]) if res.has_key?(:hash) - log.debug "#{entry[:path]} modified? r#{entry[:revision]} vs. r#{res[:revision]}, h#{entry[:hash]} vs. h#{res[:hash]}, t#{time_to_s(entry[:modified])} vs. t#{time_to_s(res[:modified])} => #{out}" + !times_equal?(entry[:modified], res[:modified]) + out ||= (entry[:remote_hash] != res[:remote_hash]) if res.has_key?(:remote_hash) + log.debug "#{entry[:path]} modified? r#{entry[:revision]} vs. r#{res[:revision]}, h#{entry[:remote_hash]} vs. h#{res[:remote_hash]}, t#{time_to_s(entry[:modified])} vs. t#{time_to_s(res[:modified])} => #{out}" out end def create_dir(dir) local_path = relative_to_local_path(dir[:path]) @@ -402,19 +365,47 @@ def download_file(file) local_path = relative_to_local_path(file[:path]) remote_path = relative_to_remote_path(file[:path]) - # stream download to temp file, then atomic move to real path + # check to ensure we aren't overwriting an untracked file or a + # file with local modifications + clobbering = false + if entry = database.find_by_path(file[:path]) + clobbering = calculate_hash(local_path) != entry[:local_hash] + else + clobbering = File.exists?(local_path) + end + + # stream files larger than the minimum + stream = file[:size] && file[:size] > MIN_BYTES_TO_STREAM_DOWNLOAD + + # download to temp file tmp = generate_tmpfilename(file[:path]) File.open(tmp, "w") do |f| - api.get_file(remote_path, f) + api.get_file(remote_path, f, stream) end - FileUtils.mv(tmp, local_path) + # rename old file if clobbering + if clobbering && File.exists?(local_path) + backup_path = find_nonconflicting_path(local_path) + FileUtils.mv(local_path, backup_path) + backup_relpath = local_to_relative_path(backup_path) + log.warn "#{file[:path]} had a conflict and the existing copy was renamed to #{backup_relpath} locally" + end + + # atomic move over to the real file, and update the timestamp + FileUtils.mv(tmp, local_path) update_file_timestamp(file) + + if backup_relpath + [:conflict, { :original => file[:path], :renamed => backup_relpath }] + else + true + end end + end class Push < Operation def initialize(database, api) super(database, api) @@ -443,21 +434,28 @@ if c[:is_dir] # directory creation cannot go in a thread, since later # operations might depend on the directory being there create_dir(c) - database.add_entry(c[:path], true, c[:parent_id], nil, nil, nil) + database.add_entry(c[:path], true, c[:parent_id], nil, nil, nil, nil) force_metadata_update_from_server(c) changelist[:created] << c[:path] else # spin up a thread to upload the file ptasks.add do begin - upload_file(c) - database.add_entry(c[:path], false, c[:parent_id], nil, nil, nil) - force_metadata_update_from_server(c) - changelist[:created] << c[:path] + local_hash = calculate_hash(relative_to_local_path(c[:path])) + res = upload_file(c) + database.add_entry(c[:path], false, c[:parent_id], nil, nil, nil, local_hash) + if c[:path] == res[:path] + force_metadata_update_from_server(c) + changelist[:created] << c[:path] + else + log.warn "#{c[:path]} had a conflict and was renamed to #{res[:path]} on the server" + changelist[:conflicts] ||= [] + changelist[:conflicts] << { :original => c[:path], :renamed => res[:path] } + end rescue Dbox::ServerError => e log.error "Error while uploading #{c[:path]}: #{e.inspect}" changelist[:failed] << { :operation => :create, :path => c[:path], :error => e } end end @@ -472,13 +470,21 @@ if !c[:is_dir] # spin up a thread to upload the file ptasks.add do begin - upload_file(c) - force_metadata_update_from_server(c) - changelist[:updated] << c[:path] + local_hash = calculate_hash(relative_to_local_path(c[:path])) + res = upload_file(c) + database.update_entry_by_path(c[:path], :local_hash => local_hash) + if c[:path] == res[:path] + force_metadata_update_from_server(c) + changelist[:updated] << c[:path] + else + log.warn "#{c[:path]} had a conflict and was renamed to #{res[:path]} on the server" + changelist[:conflicts] ||= [] + changelist[:conflicts] << { :original => c[:path], :renamed => res[:path] } + end rescue Dbox::ServerError => e log.error "Error while uploading #{c[:path]}: #{e.inspect}" changelist[:failed] << { :operation => :update, :path => c[:path], :error => e } end end @@ -524,11 +530,11 @@ existing_entries = current_dir_entries_as_hash(dir) child_paths = list_contents(dir).sort child_paths.each do |p| - c = { :path => p, :modified => mtime(p), :is_dir => is_dir(p), :parent_path => dir[:path] } + c = { :path => p, :modified => mtime(p), :is_dir => is_dir(p), :parent_path => dir[:path], :local_hash => calculate_hash(relative_to_local_path(p)) } if entry = existing_entries[p] c[:id] = entry[:id] recur_dirs << c if c[:is_dir] # queue dir for later out << [:update, c] if modified?(entry, c) # update iff modified else @@ -558,12 +564,20 @@ def is_dir(path) File.directory?(relative_to_local_path(path)) end def modified?(entry, res) - out = time_to_s(entry[:modified]) != time_to_s(res[:modified]) - log.debug "#{entry[:path]} modified? t#{time_to_s(entry[:modified])} vs. t#{time_to_s(res[:modified])} => #{out}" + out = true + if entry[:is_dir] + out = !times_equal?(entry[:modified], res[:modified]) + log.debug "#{entry[:path]} modified? t#{time_to_s(entry[:modified])} vs. t#{time_to_s(res[:modified])} => #{out}" + else + eh = entry[:local_hash] + rh = res[:local_hash] + out = !(eh && rh && eh == rh) + log.debug "#{entry[:path]} modified? #{eh} vs. #{rh} => #{out}" + end out end def list_contents(dir) local_path = relative_to_local_path(dir[:path]) @@ -589,17 +603,20 @@ def upload_file(file) local_path = relative_to_local_path(file[:path]) remote_path = relative_to_remote_path(file[:path]) File.open(local_path) do |f| - api.put_file(remote_path, f) + db_entry = database.find_by_path(file[:path]) + last_revision = db_entry ? db_entry[:revision] : nil + res = api.put_file(remote_path, f, last_revision) + process_basic_remote_props(res) end end def force_metadata_update_from_server(entry) res = gather_remote_info(entry) unless res == :not_modified - database.update_entry_by_path(entry[:path], :modified => res[:modified], :revision => res[:revision], :hash => res[:hash]) + database.update_entry_by_path(entry[:path], :modified => res[:modified], :revision => res[:revision], :remote_hash => res[:remote_hash]) end update_file_timestamp(database.find_by_path(entry[:path])) end end end