lib/loader/loader.rb in myreplicator-1.1.20 vs lib/loader/loader.rb in myreplicator-1.1.21

- old
+ new

@@ -31,15 +31,15 @@ def self.load initials = [] incrementals = [] all_files = Myreplicator::Loader.metadata_files - Kernel.p "===== all_files =====" - Kernel.p all_files + #Kernel.p "===== all_files =====" + #Kernel.p all_files all_files.each do |m| - Kernel.p m + #Kernel.p m if m.export_type == "initial" initials << m # Add initial to the list all_files.delete(m) # Delete obj from mixed list all_files.each do |md| @@ -51,15 +51,46 @@ end end incrementals = all_files # Remaining are all incrementals - initial_procs = Loader.initial_loads initials - parallel_load initial_procs - - incremental_procs = Loader.incremental_loads incrementals - parallel_load incremental_procs + #initial_procs = Loader.initial_loads initials + #parallel_load initial_procs + initials.each do |metadata| + Myreplicator::Log.run(:job_type => "loader", + :name => "#{metadata.export_type}_import", + :file => metadata.filename, + :export_id => metadata.export_id) do |log| + if Myreplicator::Loader.transfer_completed? metadata + if metadata.export_to == "vertica" + Myreplicator::Loader.incremental_load metadata + else + Myreplicator::Loader.initial_load metadata + end + Myreplicator::Loader.cleanup metadata + end + end + end + + #incremental_procs = Loader.incremental_loads incrementals + #parallel_load incremental_procs + #groups = Myreplicator::Loader.group_incrementals incrementals + #groups.each do |group| + incrementals.each do |metadata| + Myreplicator::Log.run(:job_type => "loader", + :name => "incremental_import", + :file => metadata.filename, + :export_id => metadata.export_id) do |log| + if Myreplicator::Loader.transfer_completed? metadata + Myreplicator::Loader.incremental_load metadata + Myreplicator::Loader.cleanup metadata + end + end + end + # end # group + #end # groups + end def self.parallel_load procs p = Parallelizer.new(:klass => "Myreplicator::Loader") procs.each do |proc| @@ -76,22 +107,22 @@ def self.initial_loads initials procs = [] initials.each do |metadata| procs << Proc.new { - Log.run(:job_type => "loader", + Myreplicator::Log.run(:job_type => "loader", :name => "#{metadata.export_type}_import", :file => metadata.filename, :export_id => metadata.export_id) do |log| - if Loader.transfer_completed? metadata + if Myreplicator::Loader.transfer_completed? metadata if metadata.export_to == "vertica" - Loader.incremental_load metadata + Myreplicator::Loader.incremental_load metadata else - Loader.initial_load metadata + Myreplicator::Loader.initial_load metadata end - Loader.cleanup metadata + Myreplicator::Loader.cleanup metadata end end } end @@ -108,18 +139,18 @@ groups = Loader.group_incrementals incrementals procs = [] groups.each do |group| procs << Proc.new { group.each do |metadata| - Log.run(:job_type => "loader", + Myreplicator::Log.run(:job_type => "loader", :name => "incremental_import", :file => metadata.filename, :export_id => metadata.export_id) do |log| - if Loader.transfer_completed? metadata - Loader.incremental_load metadata - Loader.cleanup metadata + if Myreplicator::Loader.transfer_completed? metadata + Myreplicator::Loader.incremental_load metadata + Myreplicator::Loader.cleanup metadata end end end # group } @@ -229,14 +260,14 @@ ## # Returns true if the transfer of the file # being loaded is completed ## def self.transfer_completed? metadata - Kernel.p "===== transfer_completed? metadata =====" - Kernel.p ({:export_id => metadata.export_id, - :file => metadata.export_path, - :job_type => "transporter"}) + #Kernel.p "===== transfer_completed? metadata =====" + #Kernel.p ({:export_id => metadata.export_id, + # :file => metadata.export_path, + #:job_type => "transporter"}) if Log.completed?(:export_id => metadata.export_id, :file => metadata.export_path, :job_type => "transporter") return true end @@ -281,42 +312,42 @@ files = [] Dir.glob(File.join(tmp_dir, "*.json")).each do |json_file| files << ExportMetadata.new(:metadata_path => json_file) end result = [] - Kernel.p files + #Kernel.p files files.each do |file| - puts "<<<<<<<<<<<<<<<<" + #puts "<<<<<<<<<<<<<<<<" job = Export.where("id = #{file.export_id}").first - puts "<<<<<<<<<<<<<<<<" - Kernel.p job - puts "<<<<<<<<<<<<<<<<" - puts "&&&&&&&&&&&&&&&&&&&&&&&&&&" - Kernel.p file - puts "&&&&&&&&&&&&&&&&&&&&&&&&&&" + #puts "<<<<<<<<<<<<<<<<" + #Kernel.p job + #puts "<<<<<<<<<<<<<<<<" + #puts "&&&&&&&&&&&&&&&&&&&&&&&&&&" + #Kernel.p file + #puts "&&&&&&&&&&&&&&&&&&&&&&&&&&" #if job.state == "transport_completed" - result << file + result << file #end - puts "^^^^^^^^^^^^^^^^^^^^^^^^^^" - Kernel.p result - puts "^^^^^^^^^^^^^^^^^^^^^^^^^^" + #puts "^^^^^^^^^^^^^^^^^^^^^^^^^^" + #Kernel.p result + #puts "^^^^^^^^^^^^^^^^^^^^^^^^^^" end - puts "<<<<<<<<<<<<<<<<" + #puts "<<<<<<<<<<<<<<<<" - puts "<<<<<<<<<<<<<<<<" + #puts "<<<<<<<<<<<<<<<<" return result end ## # Clears files that are older than the passed metadata file. # Note: This methoded is provided to ensure no old incremental files # ever get loaded after the schema change algorithm has been applied ## def self.clear_older_files metadata files = Loader.metadata_files - Kernel.p "===== clear old files =====" - Kernel.p metadata - Kernel.p files + #Kernel.p "===== clear old files =====" + #Kernel.p metadata + #Kernel.p files max_date = DateTime.strptime metadata.export_time files.each do |m| if metadata.export_id == m.export_id if max_date > DateTime.strptime(m.export_time) Loader.cleanup m if metadata.filepath != m.filepath