lib/etl/control/source.rb in activewarehouse-etl-0.9.5.rc1 vs lib/etl/control/source.rb in activewarehouse-etl-1.0.0.rc1

- old
+ new

@@ -38,12 +38,11 @@ def initialize(control, configuration, definition) @control = control @configuration = configuration @definition = definition - @store_locally = true - @store_locally = configuration[:store_locally] unless configuration[:store_locally].nil? + @store_locally = configuration[:store_locally].nil? ? true : configuration[:store_locally] end # Get an array of errors that occur during reading from the source def errors @errors ||= [] @@ -85,13 +84,36 @@ # Get the last fully written local file def last_local_file File.join(local_directory, File.basename(last_local_file_trigger, '.trig')) end - # Get the last local file trigger + # Get the last local file trigger filename using timestamp in filenames. + # Filename is in the format YYYYMMDDHHMMSS.csv.trig, but in the case of a + # file source there is an unpadded sequence number before the file + # extension. This code may not return the correct "last" file in that + # case (in particular when there are 10 or more source files). However, + # at this point only the database source calls the method, and it wouldn't + # make sense for a file source to use it if multiple files are expected def last_local_file_trigger - Dir.glob(File.join(local_directory, '*.trig')).last + trig_files = [] + trig_ext = '.csv.trig' + + # Store the basename (without extension) of all files that end in the + # desired extension + Dir.glob(File.join(local_directory, "*" + trig_ext)) do |f| + # Extract the basename of each file with the extension snipped off + trig_files << File.basename(f, trig_ext) if File.file?(f) + end + + # Throw an exception if no trigger files are available + raise "Local cache trigger file not found" if trig_files.empty? + + # Sort trigger file strings and get the last one + last_trig = trig_files.sort {|a,b| a <=> b}.last + + # Return the file path including extension + File.join(local_directory, last_trig + trig_ext) end # Get the local trigger file that is used to indicate that the file has # been completely written def local_file_trigger(file) @@ -101,9 +123,22 @@ # Return true if the source should read locally. def read_locally Engine.read_locally end + # Get the order of fields that this source will present to the pipeline + def order + order = [] + definition.each do |item| + case item + when Hash + order << item[:name] + else + order << item + end + end + order + end end end end Dir[File.dirname(__FILE__) + "/source/*.rb"].each { |file| require(file) }