require 'virtus' require 'celluloid' %w(sources targets helpers).each do |dir| Dir.glob(File.expand_path("../#{dir}/**/*.rb", __FILE__), &method(:require)) end module MultiSync class Client include Virtus.model include MultiSync::Helpers::Pluralize attribute :supervisor, Celluloid::SupervisionGroup attribute :running_upload_jobs, Array, default: [] attribute :running_delete_jobs, Array, default: [] attribute :complete_upload_jobs, Array, default: [] attribute :complete_delete_jobs, Array, default: [] attribute :incomplete_upload_jobs, Array, default: [] attribute :incomplete_delete_jobs, Array, default: [] attribute :sources, Array, default: [] attribute :sync_attempts, Integer, default: 0 attribute :file_sync_attempts, Integer, default: 0 attribute :started_at, Time, required: false attribute :finished_at, Time, required: false SUPPORTED_SOURCE_TYPES = [[:local, MultiSync::LocalSource], [:manifest, MultiSync::ManifestSource]] SUPPORTED_TARGET_TYPES = [[:local, MultiSync::LocalTarget], [:aws, MultiSync::AwsTarget]] # Initialize a new Client object # # @param options [Hash] def initialize(*args) self.supervisor = Celluloid::SupervisionGroup.run! super end # # # def add_target(clazz, options = {}) # TODO: friendly pool names? pool_name = Celluloid.uuid supervisor.pool(clazz, as: pool_name, args: [options], size: MultiSync.target_pool_size) pool_name end # # # def add_source(clazz, options = {}) source = clazz.new(options) sources << source source end # # # def sync MultiSync.warn 'Preventing synchronization as there are no sources found.' && return if sync_pointless? if first_run? MultiSync.debug 'Starting synchronization...' determine_sync else MultiSync.debug 'Restarting synchronization...' end sync_attempted MultiSync.debug 'Fetching upload jobs from the future...' (running_upload_jobs | incomplete_upload_jobs).each do | job | begin complete_upload_jobs << job.value rescue => error self.file_sync_attempts = file_sync_attempts + 1 MultiSync.warn error.inspect incomplete_upload_jobs << job end end MultiSync.debug 'Fetching delete jobs from the future...' (running_delete_jobs | incomplete_delete_jobs).each do | job | begin complete_delete_jobs << job.value rescue => error self.file_sync_attempts = file_sync_attempts + 1 MultiSync.warn error.inspect incomplete_delete_jobs << job end end finish_sync end private def determine_sync sources.each do |source| source_files = [] starting_synchronizing_msg = "ynchronizing: '#{source.source_dir}'" starting_synchronizing_msg.prepend MultiSync.force ? 'Forcefully s' : 'S' MultiSync.debug starting_synchronizing_msg source_files = source.files # when no targets are specified, assume all targets source.targets = supervisor_actor_names if source.targets.empty? source.targets.each do | target_id | missing_files = [] abandoned_files = [] outdated_files = [] MultiSync.debug "#{pluralize(source_files.length, 'file')} found from the source" MultiSync.debug 'Fetching files from the target...' target_files = supervisor[target_id].files MultiSync.debug "#{pluralize(target_files.length, 'file')} found from the target" missing_files.concat determine_missing_files(source_files, target_files) missing_files_msg = "#{missing_files.length} of the files are missing" missing_files_msg += ", however we're skipping them as :upload_missing_files is false" unless MultiSync.upload_missing_files MultiSync.debug missing_files_msg abandoned_files.concat determine_abandoned_files(source_files, target_files) abandoned_files_msg = "#{abandoned_files.length} of the files are abandoned" abandoned_files_msg += ", however we're skipping them as :delete_abandoned_files is false" unless MultiSync.delete_abandoned_files MultiSync.debug abandoned_files_msg # remove missing_files from source_files (as we know they're missing so don't need to check for them) # remove abandoned_files from target_files (as we know they're abandoned so don't need to check for them) outdated_files.concat determine_outdated_files(source_files - missing_files, target_files - abandoned_files) MultiSync.debug "#{outdated_files.length} of the files are outdated" MultiSync.debug 'Scheduling jobs in the future...' # outdated files outdated_files.each do | resource | running_upload_jobs << supervisor[target_id].future.upload(resource) end # missing files if MultiSync.upload_missing_files missing_files.each do | resource | running_upload_jobs << supervisor[target_id].future.upload(resource) end end # abandoned files if MultiSync.delete_abandoned_files abandoned_files.each do | resource | running_delete_jobs << supervisor[target_id].future.delete(resource) end end end end end def sync_attempted self.started_at = Time.now if first_run? self.sync_attempts = sync_attempts.next if sync_attempts > MultiSync.max_sync_attempts MultiSync.warn "Sync was attempted more then #{MultiSync.max_sync_attempts} times" fail ArgumentError, "Sync was attempted more then #{MultiSync.max_sync_attempts} times" end end def finish_sync # recurse when there are incomplete_jobs still incomplete_jobs.length != 0 ? sync : self.finished_at = Time.now if finished_at elapsed = finished_at.to_f - started_at.to_f minutes, seconds = elapsed.divmod 60.0 bytes = complete_upload_jobs_bytes kilobytes = bytes / 1024.0 MultiSync.debug "Sync completed in #{pluralize(minutes.round, 'minute')} and #{pluralize(seconds.round, 'second')}" MultiSync.debug 'The combined upload weight was ' + ((bytes > 1024.0) ? pluralize(kilobytes.round, 'kilobyte') : pluralize(bytes.round, 'byte')) MultiSync.debug "#{pluralize(file_sync_attempts, 'failed request')} were detected and re-tried" else MultiSync.debug "Sync failed to complete with #{pluralize(incomplete_jobs.length, 'outstanding file')} to be synchronised" end MultiSync.debug "#{pluralize(complete_jobs.length, 'file')} were synchronised (#{pluralize(complete_delete_jobs.length, 'deleted file')} and #{pluralize(complete_upload_jobs.length, 'uploaded file')}) from #{pluralize(sources.length, 'source')} to #{pluralize(supervisor_actor_names.length, 'target')}" supervisor.terminate end def complete_jobs complete_upload_jobs | complete_delete_jobs end def incomplete_jobs incomplete_upload_jobs | incomplete_delete_jobs end def complete_upload_jobs_bytes total_bytes = 0 complete_upload_jobs.each do | job | total_bytes += job.content_length || job.determine_content_length || 0 end total_bytes end def determine_missing_files(source_files, target_files) source_files - target_files end def determine_abandoned_files(source_files, target_files) target_files - source_files end def determine_outdated_files(source_files, target_files) outdated_files = [] equivalent_files = [] # TODO: replace with celluloid pool of futures # check each source file against the matching target_file's etag source_files.each_with_index do |file, i| if !file.matching_etag?(target_files[i]) || MultiSync.force outdated_files << file else equivalent_files << file end end # TODO: move to a better place MultiSync.debug "#{equivalent_files.length} of the files are identical" outdated_files end def first_run? sync_attempts == 0 end def sync_pointless? sources.empty? end def supervisor_actor_names supervisor.actors.map(&:registered_name) end end end