lib/rocket_job/jobs/dirmon_job.rb in rocketjob-6.0.0.rc3 vs lib/rocket_job/jobs/dirmon_job.rb in rocketjob-6.0.0
- old
+ new
@@ -28,78 +28,67 @@
# To start Dirmon for the first time
# RocketJob::Jobs::DirmonJob.create!
#
# If another DirmonJob instance is already queued or running, then the create
# above will fail with:
- # MongoMapper::DocumentNotValid: Validation failed: State Another instance of this job is already queued or running
+ # Validation failed: State Another instance of this job is already queued or running
#
# Or to start DirmonJob and ignore errors if already running
# RocketJob::Jobs::DirmonJob.create
class DirmonJob < RocketJob::Job
- # Only allow one DirmonJob instance to be running at a time
- include RocketJob::Plugins::Singleton
- # Start a new job when this one completes, fails, or aborts
- include RocketJob::Plugins::Restart
+ include RocketJob::Plugins::Cron
- self.priority = 30
+ # Runs every 5 minutes by default
+ self.cron_schedule = "*/5 * * * * UTC"
+ self.description = "Directory Monitor"
+ self.priority = 30
- # Number of seconds between directory scans. Default 5 mins
- field :check_seconds, type: Float, default: 300.0, copy_on_restart: true
-
# Hash[file_name, size]
field :previous_file_names, type: Hash, default: {}, copy_on_restart: true
- before_create :set_run_at
-
- # Iterate over each Dirmon entry looking for new files
- # If a new file is found, it is not processed immediately, instead
- # it is passed to the next run of this job along with the file size.
- # If the file size has not changed, the Job is kicked off.
+ # Checks the directories for new files, starting jobs if files have not changed since the last run.
def perform
check_directories
end
private
- # Set a run_at when a new instance of this job is created
- def set_run_at
- self.run_at = Time.now + check_seconds
- end
-
- # Checks the directories for new files, starting jobs if files have not changed
- # since the last run
+ # Iterate over each Dirmon Entry looking for new files
+ # If a new file is found, it is not processed immediately, instead
+ # it is passed to the next run of this job along with the file size.
+ # If the file size has not changed, the Job is kicked off.
def check_directories
new_file_names = {}
- DirmonEntry.enabled.each do |entry|
- entry.each do |iopath|
- # S3 files are only visible once completely uploaded.
- unless iopath.partial_files_visible?
- logger.info("File: #{iopath}. Starting: #{entry.job_class_name}")
- entry.later(iopath)
+ DirmonEntry.enabled.each do |dirmon_entry|
+ dirmon_entry.each do |path|
+ # Skip file size checking since S3 files are only visible once completely uploaded.
+ unless path.partial_files_visible?
+ logger.info("File: #{path}. Starting: #{dirmon_entry.job_class_name}")
+ dirmon_entry.later(path)
next
end
# BSON Keys cannot contain periods
- key = iopath.to_s.tr(".", "_")
+ key = path.to_s.tr(".", "_")
previous_size = previous_file_names[key]
# Check every few minutes for a file size change before trying to process the file.
- size = check_file(entry, iopath, previous_size)
+ size = check_file(dirmon_entry, path, previous_size)
new_file_names[key] = size if size
end
end
self.previous_file_names = new_file_names
end
# Checks if a file should result in starting a job
# Returns [Integer] file size, or nil if the file started a job
- def check_file(entry, iopath, previous_size)
- size = iopath.size
+ def check_file(dirmon_entry, path, previous_size)
+ size = path.size
if previous_size && (previous_size == size)
- logger.info("File stabilized: #{iopath}. Starting: #{entry.job_class_name}")
- entry.later(iopath)
+ logger.info("File stabilized: #{path}. Starting: #{dirmon_entry.job_class_name}")
+ dirmon_entry.later(path)
nil
else
- logger.info("Found file: #{iopath}. File size: #{size}")
+ logger.info("Found file: #{path}. File size: #{size}")
# Keep for the next run
size
end
end
end