lib/rocket_job/dirmon_entry.rb in rocketjob-4.2.0 vs lib/rocket_job/dirmon_entry.rb in rocketjob-4.3.0.beta

- old
+ new

@@ -1,7 +1,6 @@ require 'concurrent' -require 'pathname' require 'fileutils' module RocketJob class DirmonEntry include Plugins::Document include Plugins::StateMachine @@ -141,21 +140,21 @@ # Add a path to the whitelist # Raises: Errno::ENOENT: No such file or directory def self.add_whitelist_path(path) # Confirms that path exists - path = Pathname.new(path).realpath.to_s + path = IOStreams.path(path).realpath.to_s whitelist_paths << path whitelist_paths.uniq! path end # Deletes a path from the whitelist paths # Raises: Errno::ENOENT: No such file or directory def self.delete_whitelist_path(path) # Confirms that path exists - path = Pathname.new(path).realpath.to_s + path = IOStreams.path(path).realpath.to_s whitelist_paths.delete(path) whitelist_paths.uniq! path end @@ -184,36 +183,27 @@ # Passes each filename [Pathname] found that matches the pattern into the supplied block def each SemanticLogger.named_tagged(dirmon_entry: id.to_s) do # Case insensitive filename matching - Pathname.glob(pattern, File::FNM_CASEFOLD).each do |pathname| - next if pathname.directory? - pathname = begin - pathname.realpath - rescue Errno::ENOENT - logger.warn("Unable to expand the realpath for #{pathname.inspect}. Skipping file.") - next - end - - file_name = pathname.to_s - + IOStreams.each_child(pattern) do |path| + path = path.realpath # Skip archive directories - next if file_name.include?(self.class.default_archive_directory) + next if path.to_s.include?(archive_directory || self.class.default_archive_directory) # Security check? - if whitelist_paths.size.positive? && whitelist_paths.none? { |whitepath| file_name.to_s.start_with?(whitepath) } - logger.error "Skipping file: #{file_name} since it is not in any of the whitelisted paths: #{whitelist_paths.join(', ')}" + if whitelist_paths.size.positive? && whitelist_paths.none? { |whitepath| path.to_s.start_with?(whitepath) } + logger.warn "Skipping file: #{path} since it is not in any of the whitelisted paths: #{whitelist_paths.join(', ')}" next end # File must be writable so it can be removed after processing - unless pathname.writable? - logger.error "Skipping file: #{file_name} since it is not writable by the current user. Must be able to delete/move the file after queueing the job" + if path.respond_to?(:writable?) && !path.writable? + logger.warn "Skipping file: #{file_name} since it is not writable by the current user. Must be able to delete/move the file after queueing the job" next end - yield(pathname) + yield(path) end end end # Set exception information for this DirmonEntry and fail it @@ -237,30 +227,31 @@ job_class_name.constantize rescue NameError nil end - # Archives the file and kicks off a proxy job to upload the file. - def later(pathname) - job_id = BSON::ObjectId.new - archived_file_name = archive_file(job_id, pathname) + # Archives the file, then kicks off a file upload job to upload the archived file. + def later(iopath) + job_id = BSON::ObjectId.new + archive_path = archive_iopath(iopath).join("#{job_id}_#{iopath.basename}") + iopath.move_to(archive_path) job = RocketJob::Jobs::UploadFileJob.create!( job_class_name: job_class_name, properties: properties, - description: "#{name}: #{pathname.basename}", - upload_file_name: archived_file_name.to_s, - original_file_name: pathname.to_s, + description: "#{name}: #{iopath.basename}", + upload_file_name: archive_path.to_s, + original_file_name: iopath.to_s, job_id: job_id ) logger.info( message: 'Created RocketJob::Jobs::UploadFileJob', payload: { dirmon_entry_name: name, - upload_file_name: archived_file_name.to_s, - original_file_name: pathname.to_s, + upload_file_name: archive_path.to_s, + original_file_name: iopath.to_s, job_class_name: job_class_name, job_id: job_id.to_s, upload_job_id: job.id.to_s } ) @@ -276,40 +267,16 @@ end class_attribute :whitelist_paths self.whitelist_paths = Concurrent::Array.new - # Move the file to the archive directory - # - # The archived file name is prefixed with the job id - # - # Returns [String] the fully qualified archived file name - # - # Note: - # - Works across partitions when the file and the archive are on different partitions - def archive_file(job_id, pathname) - target_path = archive_pathname(pathname) - target_path.mkpath - target_file_name = target_path.join("#{job_id}_#{pathname.basename}") - # In case the file is being moved across partitions - FileUtils.move(pathname.to_s, target_file_name.to_s) - target_file_name.to_s - end - # Returns [Pathname] to the archive directory, and creates it if it does not exist. # # If `archive_directory` is a relative path, it is appended to the `file_pathname`. # If `archive_directory` is an absolute path, it is returned as-is. - def archive_pathname(file_pathname) - path = Pathname.new(archive_directory) - path = file_pathname.dirname.join(archive_directory) if path.relative? - - begin - path.mkpath unless path.exist? - rescue Errno::ENOENT => exc - raise(Errno::ENOENT, "DirmonJob failed to create archive directory: #{path}, #{exc.message}") - end - path.realpath + def archive_iopath(iopath) + path = IOStreams.path(archive_directory) + path.relative? ? iopath.directory.join(archive_directory) : path end # Validates job_class is a Rocket Job def job_is_a_rocket_job klass = job_class