lib/loader/loader.rb in myreplicator-1.1.58 vs lib/loader/loader.rb in myreplicator-1.1.61

- old
+ new

@@ -50,56 +50,40 @@ def self.load initials = [] incrementals = [] all_files = Myreplicator::Loader.metadata_files - #Kernel.p "===== all_files =====" - #Kernel.p all_files - -=begin - all_files.each do |m| - #Kernel.p m - if m.export_type == "initial" - initials << m # Add initial to the list - all_files.delete(m) # Delete obj from mixed list - - all_files.each do |md| - if m.equals(md) && md.export_type == "incremental" - initials << md # incremental should happen after the initial load - all_files.delete(md) # remove from current list of files - end - end - end - end - - incrementals = all_files # Remaining are all incrementals -=end - # adding incremetal loading file to redis files_to_metadata = {} @redis = Redis.new(:host => Settings[:redis][:host], :port => Settings[:redis][:port]) @load_set = "myreplicator_load_set" @load_hash = "myreplicator_load_hash" + # clear out |k,v| of already deleted filed + @redis.hgetall(@load_hash).size + @redis.hgetall(@load_hash).each do |k, v| + if @redis.hget(@load_hash, k) == '1' + @redis.hdel(@load_hash, k) + end + end + # making the hash for mapping filepath to metadata object all_files.each do |m| if !(@redis.hexists(@load_hash, m.filepath)) @redis.hset(@load_hash, m.filepath, 0) @redis.sadd(@load_set, m.filepath) - else - if @redis.hget(@load_hash, m.filepath) == 1 - @redis.hdel(@load_hash, m.filepath) - end end files_to_metadata[m.filepath] = m end - while @redis.smembers(@load_set).size >= 0 + # processing the files in "queue" + while @redis.smembers(@load_set).size > 0 filepath = @redis.spop(@load_set) metadata = files_to_metadata[filepath] if metadata.blank? next end + # init load if metadata.export_type == "initial" Myreplicator::Log.run(:job_type => "loader", :name => "#{metadata.export_type}_import", :file => metadata.filename, :export_id => metadata.export_id) do |log| @@ -111,11 +95,11 @@ end Myreplicator::Loader.cleanup metadata end end @redis.hset(@load_hash, metadata.filepath, 1) - else #if metadata.export_type == "incremental" + else #if metadata.export_type == "incremental" # incremental load #1 Myreplicator::Log.run(:job_type => "loader", :name => "incremental_import", :file => metadata.filename, :export_id => metadata.export_id) do |log| @@ -125,50 +109,11 @@ end end #2 @redis.hset(@load_hash, metadata.filepath, 1) end - - end - - -=begin - #initial_procs = Loader.initial_loads initials - #parallel_load initial_procs - initials.each do |metadata| - Myreplicator::Log.run(:job_type => "loader", - :name => "#{metadata.export_type}_import", - :file => metadata.filename, - :export_id => metadata.export_id) do |log| - if Myreplicator::Loader.transfer_completed? metadata - if metadata.export_to == "vertica" - Myreplicator::Loader.incremental_load metadata - else - Myreplicator::Loader.initial_load metadata - end - Myreplicator::Loader.cleanup metadata - end - end - end - - #incremental_procs = Loader.incremental_loads incrementals - #parallel_load incremental_procs - #groups = Myreplicator::Loader.group_incrementals incrementals - #groups.each do |group| - incrementals.each do |metadata| - Myreplicator::Log.run(:job_type => "loader", - :name => "incremental_import", - :file => metadata.filename, - :export_id => metadata.export_id) do |log| - if Myreplicator::Loader.transfer_completed? metadata - Myreplicator::Loader.incremental_load metadata - Myreplicator::Loader.cleanup metadata - end - end - end - # end # group - #end # groups -=end + sleep(2) + end # end while end def self.parallel_load procs p = Parallelizer.new(:klass => "Myreplicator::Loader") procs.each do |proc|