lib/rocket_job/jobs/dirmon_job.rb in rocketjob-4.2.0 vs lib/rocket_job/jobs/dirmon_job.rb in rocketjob-4.3.0.beta
- old
+ new
@@ -68,30 +68,37 @@
# Checks the directories for new files, starting jobs if files have not changed
# since the last run
def check_directories
new_file_names = {}
DirmonEntry.enabled.each do |entry|
- entry.each do |pathname|
+ entry.each do |iopath|
+ # S3 files are only visible once completely uploaded.
+ if iopath.is_a?(IOStreams::Paths::S3)
+ logger.info("S3 File: #{iopath}. Starting: #{entry.job_class_name}")
+ entry.later(iopath)
+ next
+ end
+
# BSON Keys cannot contain periods
- key = pathname.to_s.tr('.', '_')
+ key = iopath.to_s.tr('.', '_')
previous_size = previous_file_names[key]
- size = check_file(entry, pathname, previous_size)
+ size = check_file(entry, iopath, previous_size)
new_file_names[key] = size if size
end
end
self.previous_file_names = new_file_names
end
# Checks if a file should result in starting a job
# Returns [Integer] file size, or nil if the file started a job
- def check_file(entry, pathname, previous_size)
- size = pathname.size
+ def check_file(entry, iopath, previous_size)
+ size = iopath.size
if previous_size && (previous_size == size)
- logger.info("File stabilized: #{pathname}. Starting: #{entry.job_class_name}")
- entry.later(pathname)
+ logger.info("File stabilized: #{iopath}. Starting: #{entry.job_class_name}")
+ entry.later(iopath)
nil
else
- logger.info("Found file: #{pathname}. File size: #{size}")
+ logger.info("Found file: #{iopath}. File size: #{size}")
# Keep for the next run
size
end
end
end