lib/rocket_job/jobs/dirmon_job.rb in rocketjob-4.2.0 vs lib/rocket_job/jobs/dirmon_job.rb in rocketjob-4.3.0.beta

- old
+ new

@@ -68,30 +68,37 @@ # Checks the directories for new files, starting jobs if files have not changed # since the last run def check_directories new_file_names = {} DirmonEntry.enabled.each do |entry| - entry.each do |pathname| + entry.each do |iopath| + # S3 files are only visible once completely uploaded. + if iopath.is_a?(IOStreams::Paths::S3) + logger.info("S3 File: #{iopath}. Starting: #{entry.job_class_name}") + entry.later(iopath) + next + end + # BSON Keys cannot contain periods - key = pathname.to_s.tr('.', '_') + key = iopath.to_s.tr('.', '_') previous_size = previous_file_names[key] - size = check_file(entry, pathname, previous_size) + size = check_file(entry, iopath, previous_size) new_file_names[key] = size if size end end self.previous_file_names = new_file_names end # Checks if a file should result in starting a job # Returns [Integer] file size, or nil if the file started a job - def check_file(entry, pathname, previous_size) - size = pathname.size + def check_file(entry, iopath, previous_size) + size = iopath.size if previous_size && (previous_size == size) - logger.info("File stabilized: #{pathname}. Starting: #{entry.job_class_name}") - entry.later(pathname) + logger.info("File stabilized: #{iopath}. Starting: #{entry.job_class_name}") + entry.later(iopath) nil else - logger.info("Found file: #{pathname}. File size: #{size}") + logger.info("Found file: #{iopath}. File size: #{size}") # Keep for the next run size end end end