lib/loader/loader.rb in myreplicator-1.1.58 vs lib/loader/loader.rb in myreplicator-1.1.61
- old
+ new
@@ -50,56 +50,40 @@
def self.load
initials = []
incrementals = []
all_files = Myreplicator::Loader.metadata_files
- #Kernel.p "===== all_files ====="
- #Kernel.p all_files
-
-=begin
- all_files.each do |m|
- #Kernel.p m
- if m.export_type == "initial"
- initials << m # Add initial to the list
- all_files.delete(m) # Delete obj from mixed list
-
- all_files.each do |md|
- if m.equals(md) && md.export_type == "incremental"
- initials << md # incremental should happen after the initial load
- all_files.delete(md) # remove from current list of files
- end
- end
- end
- end
-
- incrementals = all_files # Remaining are all incrementals
-=end
- # adding incremetal loading file to redis
files_to_metadata = {}
@redis = Redis.new(:host => Settings[:redis][:host], :port => Settings[:redis][:port])
@load_set = "myreplicator_load_set"
@load_hash = "myreplicator_load_hash"
+ # clear out |k,v| of already deleted filed
+ @redis.hgetall(@load_hash).size
+ @redis.hgetall(@load_hash).each do |k, v|
+ if @redis.hget(@load_hash, k) == '1'
+ @redis.hdel(@load_hash, k)
+ end
+ end
+ # making the hash for mapping filepath to metadata object
all_files.each do |m|
if !(@redis.hexists(@load_hash, m.filepath))
@redis.hset(@load_hash, m.filepath, 0)
@redis.sadd(@load_set, m.filepath)
- else
- if @redis.hget(@load_hash, m.filepath) == 1
- @redis.hdel(@load_hash, m.filepath)
- end
end
files_to_metadata[m.filepath] = m
end
- while @redis.smembers(@load_set).size >= 0
+ # processing the files in "queue"
+ while @redis.smembers(@load_set).size > 0
filepath = @redis.spop(@load_set)
metadata = files_to_metadata[filepath]
if metadata.blank?
next
end
+ # init load
if metadata.export_type == "initial"
Myreplicator::Log.run(:job_type => "loader",
:name => "#{metadata.export_type}_import",
:file => metadata.filename,
:export_id => metadata.export_id) do |log|
@@ -111,11 +95,11 @@
end
Myreplicator::Loader.cleanup metadata
end
end
@redis.hset(@load_hash, metadata.filepath, 1)
- else #if metadata.export_type == "incremental"
+ else #if metadata.export_type == "incremental" # incremental load
#1
Myreplicator::Log.run(:job_type => "loader",
:name => "incremental_import",
:file => metadata.filename,
:export_id => metadata.export_id) do |log|
@@ -125,50 +109,11 @@
end
end
#2
@redis.hset(@load_hash, metadata.filepath, 1)
end
-
- end
-
-
-=begin
- #initial_procs = Loader.initial_loads initials
- #parallel_load initial_procs
- initials.each do |metadata|
- Myreplicator::Log.run(:job_type => "loader",
- :name => "#{metadata.export_type}_import",
- :file => metadata.filename,
- :export_id => metadata.export_id) do |log|
- if Myreplicator::Loader.transfer_completed? metadata
- if metadata.export_to == "vertica"
- Myreplicator::Loader.incremental_load metadata
- else
- Myreplicator::Loader.initial_load metadata
- end
- Myreplicator::Loader.cleanup metadata
- end
- end
- end
-
- #incremental_procs = Loader.incremental_loads incrementals
- #parallel_load incremental_procs
- #groups = Myreplicator::Loader.group_incrementals incrementals
- #groups.each do |group|
- incrementals.each do |metadata|
- Myreplicator::Log.run(:job_type => "loader",
- :name => "incremental_import",
- :file => metadata.filename,
- :export_id => metadata.export_id) do |log|
- if Myreplicator::Loader.transfer_completed? metadata
- Myreplicator::Loader.incremental_load metadata
- Myreplicator::Loader.cleanup metadata
- end
- end
- end
- # end # group
- #end # groups
-=end
+ sleep(2)
+ end # end while
end
def self.parallel_load procs
p = Parallelizer.new(:klass => "Myreplicator::Loader")
procs.each do |proc|