lib/loader/loader.rb in myreplicator-1.1.20 vs lib/loader/loader.rb in myreplicator-1.1.21
- old
+ new
@@ -31,15 +31,15 @@
def self.load
initials = []
incrementals = []
all_files = Myreplicator::Loader.metadata_files
- Kernel.p "===== all_files ====="
- Kernel.p all_files
+ #Kernel.p "===== all_files ====="
+ #Kernel.p all_files
all_files.each do |m|
- Kernel.p m
+ #Kernel.p m
if m.export_type == "initial"
initials << m # Add initial to the list
all_files.delete(m) # Delete obj from mixed list
all_files.each do |md|
@@ -51,15 +51,46 @@
end
end
incrementals = all_files # Remaining are all incrementals
- initial_procs = Loader.initial_loads initials
- parallel_load initial_procs
-
- incremental_procs = Loader.incremental_loads incrementals
- parallel_load incremental_procs
+ #initial_procs = Loader.initial_loads initials
+ #parallel_load initial_procs
+ initials.each do |metadata|
+ Myreplicator::Log.run(:job_type => "loader",
+ :name => "#{metadata.export_type}_import",
+ :file => metadata.filename,
+ :export_id => metadata.export_id) do |log|
+ if Myreplicator::Loader.transfer_completed? metadata
+ if metadata.export_to == "vertica"
+ Myreplicator::Loader.incremental_load metadata
+ else
+ Myreplicator::Loader.initial_load metadata
+ end
+ Myreplicator::Loader.cleanup metadata
+ end
+ end
+ end
+
+ #incremental_procs = Loader.incremental_loads incrementals
+ #parallel_load incremental_procs
+ #groups = Myreplicator::Loader.group_incrementals incrementals
+ #groups.each do |group|
+ incrementals.each do |metadata|
+ Myreplicator::Log.run(:job_type => "loader",
+ :name => "incremental_import",
+ :file => metadata.filename,
+ :export_id => metadata.export_id) do |log|
+ if Myreplicator::Loader.transfer_completed? metadata
+ Myreplicator::Loader.incremental_load metadata
+ Myreplicator::Loader.cleanup metadata
+ end
+ end
+ end
+ # end # group
+ #end # groups
+
end
def self.parallel_load procs
p = Parallelizer.new(:klass => "Myreplicator::Loader")
procs.each do |proc|
@@ -76,22 +107,22 @@
def self.initial_loads initials
procs = []
initials.each do |metadata|
procs << Proc.new {
- Log.run(:job_type => "loader",
+ Myreplicator::Log.run(:job_type => "loader",
:name => "#{metadata.export_type}_import",
:file => metadata.filename,
:export_id => metadata.export_id) do |log|
- if Loader.transfer_completed? metadata
+ if Myreplicator::Loader.transfer_completed? metadata
if metadata.export_to == "vertica"
- Loader.incremental_load metadata
+ Myreplicator::Loader.incremental_load metadata
else
- Loader.initial_load metadata
+ Myreplicator::Loader.initial_load metadata
end
- Loader.cleanup metadata
+ Myreplicator::Loader.cleanup metadata
end
end
}
end
@@ -108,18 +139,18 @@
groups = Loader.group_incrementals incrementals
procs = []
groups.each do |group|
procs << Proc.new {
group.each do |metadata|
- Log.run(:job_type => "loader",
+ Myreplicator::Log.run(:job_type => "loader",
:name => "incremental_import",
:file => metadata.filename,
:export_id => metadata.export_id) do |log|
- if Loader.transfer_completed? metadata
- Loader.incremental_load metadata
- Loader.cleanup metadata
+ if Myreplicator::Loader.transfer_completed? metadata
+ Myreplicator::Loader.incremental_load metadata
+ Myreplicator::Loader.cleanup metadata
end
end
end # group
}
@@ -229,14 +260,14 @@
##
# Returns true if the transfer of the file
# being loaded is completed
##
def self.transfer_completed? metadata
- Kernel.p "===== transfer_completed? metadata ====="
- Kernel.p ({:export_id => metadata.export_id,
- :file => metadata.export_path,
- :job_type => "transporter"})
+ #Kernel.p "===== transfer_completed? metadata ====="
+ #Kernel.p ({:export_id => metadata.export_id,
+ # :file => metadata.export_path,
+ #:job_type => "transporter"})
if Log.completed?(:export_id => metadata.export_id,
:file => metadata.export_path,
:job_type => "transporter")
return true
end
@@ -281,42 +312,42 @@
files = []
Dir.glob(File.join(tmp_dir, "*.json")).each do |json_file|
files << ExportMetadata.new(:metadata_path => json_file)
end
result = []
- Kernel.p files
+ #Kernel.p files
files.each do |file|
- puts "<<<<<<<<<<<<<<<<"
+ #puts "<<<<<<<<<<<<<<<<"
job = Export.where("id = #{file.export_id}").first
- puts "<<<<<<<<<<<<<<<<"
- Kernel.p job
- puts "<<<<<<<<<<<<<<<<"
- puts "&&&&&&&&&&&&&&&&&&&&&&&&&&"
- Kernel.p file
- puts "&&&&&&&&&&&&&&&&&&&&&&&&&&"
+ #puts "<<<<<<<<<<<<<<<<"
+ #Kernel.p job
+ #puts "<<<<<<<<<<<<<<<<"
+ #puts "&&&&&&&&&&&&&&&&&&&&&&&&&&"
+ #Kernel.p file
+ #puts "&&&&&&&&&&&&&&&&&&&&&&&&&&"
#if job.state == "transport_completed"
- result << file
+ result << file
#end
- puts "^^^^^^^^^^^^^^^^^^^^^^^^^^"
- Kernel.p result
- puts "^^^^^^^^^^^^^^^^^^^^^^^^^^"
+ #puts "^^^^^^^^^^^^^^^^^^^^^^^^^^"
+ #Kernel.p result
+ #puts "^^^^^^^^^^^^^^^^^^^^^^^^^^"
end
- puts "<<<<<<<<<<<<<<<<"
+ #puts "<<<<<<<<<<<<<<<<"
- puts "<<<<<<<<<<<<<<<<"
+ #puts "<<<<<<<<<<<<<<<<"
return result
end
##
# Clears files that are older than the passed metadata file.
# Note: This methoded is provided to ensure no old incremental files
# ever get loaded after the schema change algorithm has been applied
##
def self.clear_older_files metadata
files = Loader.metadata_files
- Kernel.p "===== clear old files ====="
- Kernel.p metadata
- Kernel.p files
+ #Kernel.p "===== clear old files ====="
+ #Kernel.p metadata
+ #Kernel.p files
max_date = DateTime.strptime metadata.export_time
files.each do |m|
if metadata.export_id == m.export_id
if max_date > DateTime.strptime(m.export_time)
Loader.cleanup m if metadata.filepath != m.filepath