lib/rocket_job/jobs/dirmon_job.rb in rocketjob-6.0.0.rc3 vs lib/rocket_job/jobs/dirmon_job.rb in rocketjob-6.0.0

- old
+ new

@@ -28,78 +28,67 @@ # To start Dirmon for the first time # RocketJob::Jobs::DirmonJob.create! # # If another DirmonJob instance is already queued or running, then the create # above will fail with: - # MongoMapper::DocumentNotValid: Validation failed: State Another instance of this job is already queued or running + # Validation failed: State Another instance of this job is already queued or running # # Or to start DirmonJob and ignore errors if already running # RocketJob::Jobs::DirmonJob.create class DirmonJob < RocketJob::Job - # Only allow one DirmonJob instance to be running at a time - include RocketJob::Plugins::Singleton - # Start a new job when this one completes, fails, or aborts - include RocketJob::Plugins::Restart + include RocketJob::Plugins::Cron - self.priority = 30 + # Runs every 5 minutes by default + self.cron_schedule = "*/5 * * * * UTC" + self.description = "Directory Monitor" + self.priority = 30 - # Number of seconds between directory scans. Default 5 mins - field :check_seconds, type: Float, default: 300.0, copy_on_restart: true - # Hash[file_name, size] field :previous_file_names, type: Hash, default: {}, copy_on_restart: true - before_create :set_run_at - - # Iterate over each Dirmon entry looking for new files - # If a new file is found, it is not processed immediately, instead - # it is passed to the next run of this job along with the file size. - # If the file size has not changed, the Job is kicked off. + # Checks the directories for new files, starting jobs if files have not changed since the last run. def perform check_directories end private - # Set a run_at when a new instance of this job is created - def set_run_at - self.run_at = Time.now + check_seconds - end - - # Checks the directories for new files, starting jobs if files have not changed - # since the last run + # Iterate over each Dirmon Entry looking for new files + # If a new file is found, it is not processed immediately, instead + # it is passed to the next run of this job along with the file size. + # If the file size has not changed, the Job is kicked off. def check_directories new_file_names = {} - DirmonEntry.enabled.each do |entry| - entry.each do |iopath| - # S3 files are only visible once completely uploaded. - unless iopath.partial_files_visible? - logger.info("File: #{iopath}. Starting: #{entry.job_class_name}") - entry.later(iopath) + DirmonEntry.enabled.each do |dirmon_entry| + dirmon_entry.each do |path| + # Skip file size checking since S3 files are only visible once completely uploaded. + unless path.partial_files_visible? + logger.info("File: #{path}. Starting: #{dirmon_entry.job_class_name}") + dirmon_entry.later(path) next end # BSON Keys cannot contain periods - key = iopath.to_s.tr(".", "_") + key = path.to_s.tr(".", "_") previous_size = previous_file_names[key] # Check every few minutes for a file size change before trying to process the file. - size = check_file(entry, iopath, previous_size) + size = check_file(dirmon_entry, path, previous_size) new_file_names[key] = size if size end end self.previous_file_names = new_file_names end # Checks if a file should result in starting a job # Returns [Integer] file size, or nil if the file started a job - def check_file(entry, iopath, previous_size) - size = iopath.size + def check_file(dirmon_entry, path, previous_size) + size = path.size if previous_size && (previous_size == size) - logger.info("File stabilized: #{iopath}. Starting: #{entry.job_class_name}") - entry.later(iopath) + logger.info("File stabilized: #{path}. Starting: #{dirmon_entry.job_class_name}") + dirmon_entry.later(path) nil else - logger.info("Found file: #{iopath}. File size: #{size}") + logger.info("Found file: #{path}. File size: #{size}") # Keep for the next run size end end end