lib/rhoconnect/jobs/bulk_data_job.rb in rhoconnect-4.0.4 vs lib/rhoconnect/jobs/bulk_data_job.rb in rhoconnect-5.1.1

- old
+ new

@@ -1,12 +1,12 @@ -require 'zip/zip' +require 'zip' require 'zlib' module Rhoconnect module BulkDataJob @queue = :bulk_data - + def self.perform(params) do_bulk_job(params) do |bulk_data| timer = start_timer('starting bulk data process') bulk_data.process_sources timer = lap_timer('process_sources',timer) @@ -48,11 +48,11 @@ def self.import_data_to_object_values(db,source,is_selected=true) data = {} data = source.get_data(:md) if is_selected counter = {} db.transaction do |database| - database.prepare("insert into object_values + database.prepare("insert into object_values (source_id,attrib,object,value) values (?,?,?,?)") do |stmt| data.each do |object_id,object| object.each do |attrib,value| counter[attrib] = counter[attrib] ? counter[attrib] + 1 : 1 stmt.execute(source.source_id.to_i,attrib,object_id,value) @@ -60,81 +60,81 @@ end end end counter end - + # Loads data into fixed schema table based on source settings def self.import_data_to_fixed_schema(db,source,is_selected=true) data = {} data = source.get_data(:md) if is_selected - counter = {} + # counter = {} columns,qm = [],[] create_table = ["\"object\" varchar(255) PRIMARY KEY"] schema = JSON.parse(source.schema) - + db.transaction do |database| # Create a table with columns specified by 'property' array in settings schema['property'].each do |key,value| - create_table << "\"#{key}\" varchar default NULL" + create_table << "\"#{key}\" varchar default NULL" columns << key qm << '?' end database.execute("CREATE TABLE #{source.name}(#{create_table.join(",")} );") database.prepare("insert into #{source.name} (object,#{columns.join(',')}) values (?,#{qm.join(',')})") do |stmt| data.each do |obj,row| args = [obj] columns.each { |col| args << row[col] } # The * is used to expand an array into individual arguments for 'execute' method. - # JRuby (1.6.0) won't work without asterisk, but other rubies doing well! - stmt.execute(*args) + # JRuby (1.6.0) won't work without asterisk, but other rubies doing well! + stmt.execute(*args) end end - + # Create indexes for specified columns in settings 'index' schema['index'].each do |key,value| val2 = "" value.split(',').each do |col| val2 += ',' if val2.length > 0 val2 += "\"#{col}\"" end - + database.execute("CREATE INDEX #{key} on #{source.name} (#{val2});") end if schema['index'] - + # Create unique indexes for specified columns in settings 'unique_index' schema['unique_index'].each do |key,value| val2 = "" value.split(',').each do |col| val2 += ',' if val2.length > 0 val2 += "\"#{col}\"" end - + database.execute("CREATE UNIQUE INDEX #{key} on #{source.name} (#{val2});") end if schema['unique_index'] end - + return {} end - + def self.refs_to_s(refs) str = '' refs.sort.each do |name,value| str << "#{name},#{value}," end str[0..-2] end - + # #2354: Bulk sync not updating sources table # last_inserted_size + # last_deleted_size # backend_refresh_time + - def self.populate_sources_table(db,sources_refs) + def self.populate_sources_table(db,sources_refs) db.transaction do |database| database.prepare("insert into sources - (source_id,name,sync_priority,partition,sync_type,source_attribs,metadata,schema,blob_attribs,associations,last_inserted_size,backend_refresh_time) + (source_id,name,sync_priority,partition,sync_type,source_attribs,metadata,schema,blob_attribs,associations,last_inserted_size,backend_refresh_time) values (?,?,?,?,?,?,?,?,?,?,?,?)") do |stmt| sources_refs.each do |source_name,ref| s = ref[:source] # skipped sources should be marked with sync type :none sync_type = ref[:skip_source] ? 'none' : s.sync_type.to_s @@ -142,21 +142,21 @@ sync_type,refs_to_s(ref[:refs]),s.get_value(:metadata),s.schema,s.blob_attribs,s.has_many,s.get_value(:md_size).to_i,s.read_state.refresh_time) end end end end - + def self.create_sqlite_data_file(bulk_data,ts) sources_refs = {} schema,index,bulk_data.dbfile = get_file_args(bulk_data.name,ts) FileUtils.mkdir_p(File.dirname(bulk_data.dbfile)) # TODO: remove old bulk files! # FileUtils.rm Dir.glob(File.join(Rhoconnect.data_directory, "#{bulk_data.name}*")) - db = DBAdapter.instance.get_connection(bulk_data.dbfile) + db = DBAdapter.instance.get_connection(bulk_data.dbfile) db.execute_batch(File.open(schema,'r').read) - + src_counter = 1 selected_sources = {} bulk_data.sources[0, -1].each do |source| selected_sources[source] = true end @@ -171,44 +171,44 @@ if source.schema source_attrib_refs = import_data_to_fixed_schema(db,source,is_selected_source) else source_attrib_refs = import_data_to_object_values(db,source,is_selected_source) end - sources_refs[source_name] = + sources_refs[source_name] = {:source => source, :refs => source_attrib_refs, :skip_source => !is_selected_source} lap_timer("finished importing sqlite data for #{source_name}",timer) end populate_sources_table(db,sources_refs) - + db.execute_batch(File.open(index,'r').read) db.execute_batch("VACUUM;"); db.close - + compress("#{bulk_data.dbfile}.rzip",bulk_data.dbfile) gzip_compress("#{bulk_data.dbfile}.gzip",bulk_data.dbfile) end - + def self.get_file_args(bulk_data_name,ts) schema = BulkData.schema_file index = BulkData.index_file dbfile = File.join(Rhoconnect.data_directory,bulk_data_name+'_'+ts+'.data') [schema,index,dbfile] end - + def self.compress(archive,file) - Zip::ZipFile.open(archive, 'w') do |zipfile| + Zip::File.open(archive, 'w') do |zipfile| zipfile.add(URI.escape(File.basename(file)),file) end end - + def self.gzip_compress(archive,file) data = File.new(file, "rb") File.open(archive, 'wb') do |f| gz = Zlib::GzipWriter.new(f) gz.write data.read gz.close end data.close end - + end end \ No newline at end of file