lib/etl/control/source.rb in activewarehouse-etl-0.9.5.rc1 vs lib/etl/control/source.rb in activewarehouse-etl-1.0.0.rc1
- old
+ new
@@ -38,12 +38,11 @@
def initialize(control, configuration, definition)
@control = control
@configuration = configuration
@definition = definition
- @store_locally = true
- @store_locally = configuration[:store_locally] unless configuration[:store_locally].nil?
+ @store_locally = configuration[:store_locally].nil? ? true : configuration[:store_locally]
end
# Get an array of errors that occur during reading from the source
def errors
@errors ||= []
@@ -85,13 +84,36 @@
# Get the last fully written local file
def last_local_file
File.join(local_directory, File.basename(last_local_file_trigger, '.trig'))
end
- # Get the last local file trigger
+ # Get the last local file trigger filename using timestamp in filenames.
+ # Filename is in the format YYYYMMDDHHMMSS.csv.trig, but in the case of a
+ # file source there is an unpadded sequence number before the file
+ # extension. This code may not return the correct "last" file in that
+ # case (in particular when there are 10 or more source files). However,
+ # at this point only the database source calls the method, and it wouldn't
+ # make sense for a file source to use it if multiple files are expected
def last_local_file_trigger
- Dir.glob(File.join(local_directory, '*.trig')).last
+ trig_files = []
+ trig_ext = '.csv.trig'
+
+ # Store the basename (without extension) of all files that end in the
+ # desired extension
+ Dir.glob(File.join(local_directory, "*" + trig_ext)) do |f|
+ # Extract the basename of each file with the extension snipped off
+ trig_files << File.basename(f, trig_ext) if File.file?(f)
+ end
+
+ # Throw an exception if no trigger files are available
+ raise "Local cache trigger file not found" if trig_files.empty?
+
+ # Sort trigger file strings and get the last one
+ last_trig = trig_files.sort {|a,b| a <=> b}.last
+
+ # Return the file path including extension
+ File.join(local_directory, last_trig + trig_ext)
end
# Get the local trigger file that is used to indicate that the file has
# been completely written
def local_file_trigger(file)
@@ -101,9 +123,22 @@
# Return true if the source should read locally.
def read_locally
Engine.read_locally
end
+ # Get the order of fields that this source will present to the pipeline
+ def order
+ order = []
+ definition.each do |item|
+ case item
+ when Hash
+ order << item[:name]
+ else
+ order << item
+ end
+ end
+ order
+ end
end
end
end
Dir[File.dirname(__FILE__) + "/source/*.rb"].each { |file| require(file) }