lib/rocket_job/dirmon_entry.rb in rocketjob-4.2.0 vs lib/rocket_job/dirmon_entry.rb in rocketjob-4.3.0.beta
- old
+ new
@@ -1,7 +1,6 @@
require 'concurrent'
-require 'pathname'
require 'fileutils'
module RocketJob
class DirmonEntry
include Plugins::Document
include Plugins::StateMachine
@@ -141,21 +140,21 @@
# Add a path to the whitelist
# Raises: Errno::ENOENT: No such file or directory
def self.add_whitelist_path(path)
# Confirms that path exists
- path = Pathname.new(path).realpath.to_s
+ path = IOStreams.path(path).realpath.to_s
whitelist_paths << path
whitelist_paths.uniq!
path
end
# Deletes a path from the whitelist paths
# Raises: Errno::ENOENT: No such file or directory
def self.delete_whitelist_path(path)
# Confirms that path exists
- path = Pathname.new(path).realpath.to_s
+ path = IOStreams.path(path).realpath.to_s
whitelist_paths.delete(path)
whitelist_paths.uniq!
path
end
@@ -184,36 +183,27 @@
# Passes each filename [Pathname] found that matches the pattern into the supplied block
def each
SemanticLogger.named_tagged(dirmon_entry: id.to_s) do
# Case insensitive filename matching
- Pathname.glob(pattern, File::FNM_CASEFOLD).each do |pathname|
- next if pathname.directory?
- pathname = begin
- pathname.realpath
- rescue Errno::ENOENT
- logger.warn("Unable to expand the realpath for #{pathname.inspect}. Skipping file.")
- next
- end
-
- file_name = pathname.to_s
-
+ IOStreams.each_child(pattern) do |path|
+ path = path.realpath
# Skip archive directories
- next if file_name.include?(self.class.default_archive_directory)
+ next if path.to_s.include?(archive_directory || self.class.default_archive_directory)
# Security check?
- if whitelist_paths.size.positive? && whitelist_paths.none? { |whitepath| file_name.to_s.start_with?(whitepath) }
- logger.error "Skipping file: #{file_name} since it is not in any of the whitelisted paths: #{whitelist_paths.join(', ')}"
+ if whitelist_paths.size.positive? && whitelist_paths.none? { |whitepath| path.to_s.start_with?(whitepath) }
+ logger.warn "Skipping file: #{path} since it is not in any of the whitelisted paths: #{whitelist_paths.join(', ')}"
next
end
# File must be writable so it can be removed after processing
- unless pathname.writable?
- logger.error "Skipping file: #{file_name} since it is not writable by the current user. Must be able to delete/move the file after queueing the job"
+ if path.respond_to?(:writable?) && !path.writable?
+ logger.warn "Skipping file: #{file_name} since it is not writable by the current user. Must be able to delete/move the file after queueing the job"
next
end
- yield(pathname)
+ yield(path)
end
end
end
# Set exception information for this DirmonEntry and fail it
@@ -237,30 +227,31 @@
job_class_name.constantize
rescue NameError
nil
end
- # Archives the file and kicks off a proxy job to upload the file.
- def later(pathname)
- job_id = BSON::ObjectId.new
- archived_file_name = archive_file(job_id, pathname)
+ # Archives the file, then kicks off a file upload job to upload the archived file.
+ def later(iopath)
+ job_id = BSON::ObjectId.new
+ archive_path = archive_iopath(iopath).join("#{job_id}_#{iopath.basename}")
+ iopath.move_to(archive_path)
job = RocketJob::Jobs::UploadFileJob.create!(
job_class_name: job_class_name,
properties: properties,
- description: "#{name}: #{pathname.basename}",
- upload_file_name: archived_file_name.to_s,
- original_file_name: pathname.to_s,
+ description: "#{name}: #{iopath.basename}",
+ upload_file_name: archive_path.to_s,
+ original_file_name: iopath.to_s,
job_id: job_id
)
logger.info(
message: 'Created RocketJob::Jobs::UploadFileJob',
payload: {
dirmon_entry_name: name,
- upload_file_name: archived_file_name.to_s,
- original_file_name: pathname.to_s,
+ upload_file_name: archive_path.to_s,
+ original_file_name: iopath.to_s,
job_class_name: job_class_name,
job_id: job_id.to_s,
upload_job_id: job.id.to_s
}
)
@@ -276,40 +267,16 @@
end
class_attribute :whitelist_paths
self.whitelist_paths = Concurrent::Array.new
- # Move the file to the archive directory
- #
- # The archived file name is prefixed with the job id
- #
- # Returns [String] the fully qualified archived file name
- #
- # Note:
- # - Works across partitions when the file and the archive are on different partitions
- def archive_file(job_id, pathname)
- target_path = archive_pathname(pathname)
- target_path.mkpath
- target_file_name = target_path.join("#{job_id}_#{pathname.basename}")
- # In case the file is being moved across partitions
- FileUtils.move(pathname.to_s, target_file_name.to_s)
- target_file_name.to_s
- end
-
# Returns [Pathname] to the archive directory, and creates it if it does not exist.
#
# If `archive_directory` is a relative path, it is appended to the `file_pathname`.
# If `archive_directory` is an absolute path, it is returned as-is.
- def archive_pathname(file_pathname)
- path = Pathname.new(archive_directory)
- path = file_pathname.dirname.join(archive_directory) if path.relative?
-
- begin
- path.mkpath unless path.exist?
- rescue Errno::ENOENT => exc
- raise(Errno::ENOENT, "DirmonJob failed to create archive directory: #{path}, #{exc.message}")
- end
- path.realpath
+ def archive_iopath(iopath)
+ path = IOStreams.path(archive_directory)
+ path.relative? ? iopath.directory.join(archive_directory) : path
end
# Validates job_class is a Rocket Job
def job_is_a_rocket_job
klass = job_class