lib/rocket_job/dirmon_entry.rb in rocketjob-3.4.3 vs lib/rocket_job/dirmon_entry.rb in rocketjob-3.5.0

- old
+ new

@@ -4,13 +4,18 @@ module RocketJob class DirmonEntry include Plugins::Document include Plugins::StateMachine + # The default archive directory that is used when the job being queued does not respond + # to #upload, and does not have an `archive_directory` specified in this entry + class_attribute :default_archive_directory + self.default_archive_directory = 'archive'.freeze + store_in collection: 'rocket_job.dirmon_entries' - # User defined name used to identify this DirmonEntry in Mission Control + # User defined name used to identify this DirmonEntry in the Web Interface. field :name, type: String # Pattern for finding files # # Example: All files ending in '.csv' in the input_files/process1 directory @@ -46,22 +51,22 @@ # file again. # # If supplied, the file will be moved to this directory before the job is started # If the file was in a sub-directory, the corresponding sub-directory will # be created in the archive directory. - field :archive_directory, type: String + field :archive_directory, type: String, default: default_archive_directory # If this DirmonEntry is in the failed state, exception contains the cause embeds_one :exception, class_name: 'RocketJob::JobException' # The maximum number of files that should ever match during a single poll of the pattern. # # Too many files could be as a result of an invalid pattern specification. # Exceeding this number will result in an exception being logged in a failed Dirmon instance. # Dirmon processing will continue with new instances. # TODO: Implement max_hits - #field :max_hits, type: Integer, default: 100 + # field :max_hits, type: Integer, default: 100 # # Read-only attributes # @@ -69,10 +74,15 @@ field :state, type: Symbol, default: :pending # Unique index on pattern to help prevent two entries from scanning the same files index({pattern: 1}, background: true, unique: true, drop_dups: true) + before_validation :strip_whitespace + validates_presence_of :pattern, :job_class_name, :archive_directory + validate :job_is_a_rocket_job + validate :job_has_properties + # State Machine events and transitions # # :pending -> :enabled -> :disabled # -> :failed # -> :failed -> :active @@ -106,31 +116,10 @@ event :fail, before: :set_exception do transitions from: :enabled, to: :failed end end - # @formatter:on - validates_presence_of :pattern, :job_class_name - - validates_each :job_class_name do |record, attr, value| - exists = - begin - value.nil? ? false : record.job_class.ancestors.include?(RocketJob::Job) - rescue NameError - false - end - record.errors.add(attr, 'job_class_name must be defined and must be derived from RocketJob::Job') unless exists - end - - validates_each :properties do |record, attr, value| - if record.job_class && (methods = record.job_class.instance_methods) - value.each_pair do |k, v| - record.errors.add(attr, "Unknown property: #{k.inspect} with value: #{v}") unless methods.include?("#{k}=".to_sym) - end - end - end - # Security Settings # # A whitelist of paths from which to process files. # This prevents accidental or malicious `pattern`s from processing files from anywhere # in the system that the user under which Dirmon is running can access. @@ -145,30 +134,30 @@ # # Default: [] ==> Do not enforce whitelists # # Returns [Array<String>] a copy of the whitelisted paths def self.get_whitelist_paths - self.whitelist_paths.dup + whitelist_paths.dup end # Add a path to the whitelist # Raises: Errno::ENOENT: No such file or directory def self.add_whitelist_path(path) # Confirms that path exists path = Pathname.new(path).realpath.to_s - self.whitelist_paths << path - self.whitelist_paths.uniq! + whitelist_paths << path + whitelist_paths.uniq! path end # Deletes a path from the whitelist paths # Raises: Errno::ENOENT: No such file or directory def self.delete_whitelist_path(path) # Confirms that path exists path = Pathname.new(path).realpath.to_s - self.whitelist_paths.delete(path) - self.whitelist_paths.uniq! + whitelist_paths.delete(path) + whitelist_paths.uniq! path end # Returns [Hash<String:Integer>] of the number of dirmon entries in each state. # Note: If there are no workers in that particular state then the hash will not have a value for it. @@ -191,34 +180,12 @@ counts[result['_id'].to_sym] = result['count'] end counts end - # The default archive directory that is used when the job being queued does not respond - # to #upload, and does not have an `archive_directory` specified in this entry - class_attribute :default_archive_directory - - self.default_archive_directory = '_archive'.freeze - - # Returns [Pathname] the archive_directory if set, otherwise the default_archive_directory - # Creates the archive directory if one is set - def archive_pathname(file_pathname) - if archive_directory - path = Pathname.new(archive_directory) - begin - path.mkpath unless path.exist? - rescue Errno::ENOENT => exc - raise(Errno::ENOENT, "DirmonJob failed to create archive directory: #{path}, #{exc.message}") - end - path.realpath - else - file_pathname.dirname.join(self.class.default_archive_directory).realdirpath - end - end - # Passes each filename [Pathname] found that matches the pattern into the supplied block - def each(&block) + def each SemanticLogger.named_tagged(dirmon_entry: id.to_s) do # Case insensitive filename matching Pathname.glob(pattern, File::FNM_CASEFOLD).each do |pathname| next if pathname.directory? pathname = begin @@ -232,21 +199,21 @@ # Skip archive directories next if file_name.include?(self.class.default_archive_directory) # Security check? - if (whitelist_paths.size > 0) && whitelist_paths.none? { |whitepath| file_name.to_s.start_with?(whitepath) } + if whitelist_paths.size.positive? && whitelist_paths.none? { |whitepath| file_name.to_s.start_with?(whitepath) } logger.error "Skipping file: #{file_name} since it is not in any of the whitelisted paths: #{whitelist_paths.join(', ')}" next end # File must be writable so it can be removed after processing unless pathname.writable? logger.error "Skipping file: #{file_name} since it is not writable by the current user. Must be able to delete/move the file after queueing the job" next end - block.call(pathname) + yield(pathname) end end end # Set exception information for this DirmonEntry and fail it @@ -262,74 +229,104 @@ worker_name: worker_name ) end end - # Returns the Job to be queued + # Returns the Job to be created. def job_class return if job_class_name.nil? job_class_name.constantize rescue NameError nil end - # Queues the job for the supplied pathname + # Archives the file and kicks off a proxy job to upload the file. def later(pathname) - if klass = job_class - logger.measure_info "Enqueued: #{name}, Job class: #{job_class_name}" do - job = klass.new(properties) - upload_file(job, pathname) - job.save! - job - end - else - raise(ArgumentError, "Cannot instantiate a class for: #{job_class_name.inspect}") - end + job_id = BSON::ObjectId.new + archived_file_name = archive_file(job_id, pathname) + + job = RocketJob::Jobs::UploadFileJob.create!( + job_class_name: job_class_name, + properties: properties, + description: "#{name}: #{pathname.basename}", + upload_file_name: archived_file_name.to_s, + original_file_name: pathname.to_s, + job_id: job_id + ) + + logger.info( + message: 'Created RocketJob::Jobs::UploadFileJob', + payload: { + dirmon_entry_name: name, + upload_file_name: archived_file_name.to_s, + original_file_name: pathname.to_s, + job_class_name: job_class_name, + job_id: job_id.to_s, + upload_job_id: job.id.to_s + } + ) + job end private + # strip whitespaces from all variables that reference paths or patterns + def strip_whitespace + self.pattern = pattern.strip unless pattern.nil? + self.archive_directory = archive_directory.strip unless archive_directory.nil? + end + class_attribute :whitelist_paths self.whitelist_paths = Concurrent::Array.new - # Upload the file to the job - def upload_file(job, pathname) - if job.respond_to?(:upload) - # With RocketJob Pro the file can be uploaded directly into the Job itself - job.upload(pathname.to_s) - archive_directory ? archive_file(job, pathname) : pathname.unlink - else - upload_default(job, pathname) - end - end - - # Archives the file for a job where there was no #upload method - def upload_default(job, pathname) - full_file_name = archive_file(job, pathname) - if job.respond_to?(:upload_file_name=) - job.upload_file_name = full_file_name - elsif job.respond_to?(:full_file_name=) - job.full_file_name = full_file_name - else - raise(ArgumentError, "#{job_class_name} must either have attribute 'upload_file_name' or 'full_file_name'") - end - end - # Move the file to the archive directory # # The archived file name is prefixed with the job id # # Returns [String] the fully qualified archived file name # # Note: # - Works across partitions when the file and the archive are on different partitions - def archive_file(job, pathname) + def archive_file(job_id, pathname) target_path = archive_pathname(pathname) target_path.mkpath - target_file_name = target_path.join("#{job.id}_#{pathname.basename}") + target_file_name = target_path.join("#{job_id}_#{pathname.basename}") # In case the file is being moved across partitions FileUtils.move(pathname.to_s, target_file_name.to_s) target_file_name.to_s end + # Returns [Pathname] to the archive directory, and creates it if it does not exist. + # + # If `archive_directory` is a relative path, it is appended to the `file_pathname`. + # If `archive_directory` is an absolute path, it is returned as-is. + def archive_pathname(file_pathname) + path = Pathname.new(archive_directory) + path = file_pathname.dirname.join(archive_directory) if path.relative? + + begin + path.mkpath unless path.exist? + rescue Errno::ENOENT => exc + raise(Errno::ENOENT, "DirmonJob failed to create archive directory: #{path}, #{exc.message}") + end + path.realpath + end + + # Validates job_class is a Rocket Job + def job_is_a_rocket_job + klass = job_class + return if job_class_name.nil? || klass&.ancestors&.include?(RocketJob::Job) + errors.add(:job_class_name, "Job #{job_class_name} must be defined and inherit from RocketJob::Job") + end + + # Does the job have all the supplied properties + def job_has_properties + klass = job_class + return unless klass + + properties.each_pair do |k, _v| + next if klass.public_method_defined?("#{k}=".to_sym) + errors.add(:properties, "Unknown Property: Attempted to set a value for #{k.inspect} which is not allowed on the job #{job_class_name}") + end + end end end