lib/health-data-standards/import/bundle/importer.rb in health-data-standards-3.0.6 vs lib/health-data-standards/import/bundle/importer.rb in health-data-standards-3.1.0

- old
+ new

@@ -1,148 +1,180 @@ +require 'zip/zipfilesystem' module HealthDataStandards - module Import - module Bundle - - class Importer - COLLECTION_NAMES = ["bundles", "records", "measures", "selected_measures", "patient_cache", "query_cache", "system.js"] - DEFAULTS = {clear_db: false, - type: nil, - delete_existing: false, - update_measures: true, - clear_collections: COLLECTION_NAMES - } - # Import a quality bundle into the database. This includes metadata, measures, test patients, supporting JS libraries, and expected results. - # - # @param [File] zip The bundle zip file. - # @param [String] Type of measures to import, either 'ep', 'eh' or nil for all - # @param [Boolean] keep_existing If true, delete all current collections related to patients and measures. - def self.import(zip, options={}) - options = DEFAULTS.merge(options) - bundle_versions = Hash[* HealthDataStandards::CQM::Bundle.where({}).collect{|b| [b._id, b.version]}.flatten] - # Unpack content from the bundle. - bundle_contents = unpack_bundle_contents(zip, options[:type]) - bundle = HealthDataStandards::CQM::Bundle.new( JSON.parse(bundle_contents[:bundle])) - - if bundle_versions.invert[bundle.version] && !(options[:delete_existing] || options[:clear_db]) - raise "A bundle with version #{bundle.version} already exists in the database. " - end + module Import + module Bundle + + class Importer - drop_collections(COLLECTION_NAMES+(options[:clear_collections]||[])) if options[:clear_db] - HealthDataStandards::CQM::Bundle.where({:version => bundle.version}).each {|b| b.delete} - # Store all JS libraries. - bundle_contents[:extensions].each do |key, contents| - save_system_js_fn(key, contents) - end + SOURCE_ROOTS = {bundle: 'bundle.json', + libraries: File.join('library_functions','*.js'), + measures: 'measures', results: 'results', + valuesets: File.join('value_sets','json','*.json'), + patients: 'patients'} + COLLECTION_NAMES = ["bundles", "records", "measures", "selected_measures", "patient_cache", "query_cache", "system.js"] + CLEAR_ONLY_COLLECTIONS = ["system.js"] + DEFAULTS = {type: nil, + delete_existing: false, + update_measures: true, + clear_collections: COLLECTION_NAMES + } - # Store the bundle metadata. + # Import a quality bundle into the database. This includes metadata, measures, test patients, supporting JS libraries, and expected results. + # + # @param [File] zip The bundle zip file. + # @param [String] Type of measures to import, either 'ep', 'eh' or nil for all + # @param [Boolean] keep_existing If true, delete all current collections related to patients and measures. + def self.import(zip, options={}) + options = DEFAULTS.merge(options) - - unless bundle.save - raise bundle.errors.full_messages.join(",") - end + bundle = nil + Zip::ZipFile.open(zip.path) do |zip_file| - bundle_id = bundle.id - - - # Store all measures. - bundle_contents[:measures].each do |key, contents| - json = JSON.parse(contents, {:max_nesting => 100}) - measure = json.clone - # measure = HealthDataStandards::CQM::Measure.new(json) - measure['bundle_id'] = bundle_id - Mongoid.default_session["measures"].insert(measure) - + bundle = unpack_bundle(zip_file) - if options[:update_measures] - Mongoid.default_session["measures"].where({hqmf_id: measure["hqmf_id"], sub_id: measure["sub_id"]}).each do |m| - b = HealthDataStandards::CQM::Bundle.find(m["bundle_id"]) - if b.version < bundle.version - m.merge!(json) - Mongoid.default_session["measures"].where({"_id" => m["_id"]}).update(m) - end + bundle_versions = Hash[* HealthDataStandards::CQM::Bundle.where({}).collect{|b| [b._id, b.version]}.flatten] + if bundle_versions.invert[bundle.version] && !(options[:delete_existing]) + raise "A bundle with version #{bundle.version} already exists in the database. " + end - end - end - end - - - bundle_contents[:patients].each do |key, contents| - patient = Record.new( JSON.parse(contents, {:max_nesting => 100})) - patient['bundle_id'] = bundle_id - patient.save - end + HealthDataStandards::CQM::Bundle.where({:version => bundle.version}).each do |b| + puts "deleting existing bundle version: #{b.version}" + b.delete + end if options[:delete_existing] - bundle_contents[:valuesets].each do |key, contents| - json = JSON.parse(contents, {:max_nesting => 100}) - vs = HealthDataStandards::SVS::ValueSet.new(json) - vs['bundle_id'] = bundle_id - vs.save - end - - # Store the expected results into the query and patient caches. - bundle_contents[:results].each do |name, contents| - collection = name == "by_patient" ? "patient_cache" : "query_cache" - contents = JSON.parse(contents, {:max_nesting => 100}) + unpack_and_store_system_js(zip_file) - contents.each {|document| - document['bundle_id'] = bundle_id - Mongoid.default_session[collection].insert(document) - } + # Store the bundle metadata. + unless bundle.save + raise bundle.errors.full_messages.join(",") + end + puts "bundle metadata unpacked..." - end - - bundle - end + measure_ids = unpack_and_store_measures(zip_file, options[:type], bundle, options[:update_measures]) + unpack_and_store_patients(zip_file, options[:type], bundle) + unpack_and_store_valuesets(zip_file, bundle) + unpack_and_store_results(zip_file, options[:type], measure_ids, bundle) - # Delete a list of collections. By default, this function drops all of collections related to measures and patients. - # - # @param [Array] collection_names Optionally, an array of collection names to be dropped. - def self.drop_collections(collection_names=[]) - collection_names = COLLECTION_NAMES if collection_names.empty? - collection_names.each {|collection| Mongoid.default_session[collection].drop} - end + end - # Save a javascript function into Mongo's system.js collection for measure execution. - # - # @param [String] name The name by which the function will be referred. - # @param [String] fn The body of the function being saved. - def self.save_system_js_fn(name, fn) - fn = "function () {\n #{fn} \n }" - Mongoid.default_session['system.js'].find('_id' => name).upsert( - { - "_id" => name, - "value" => Moped::BSON::Code.new(fn) - } - ) - end + bundle + end - # A utility function for finding files in a bundle. Strip a file path of it's extension and just give the filename. - # - # @param [String] original A file path. - # @param [String] extension A file extension. - # @return The filename at the end of the original String path with the extension removed. e.g. "/boo/urns.html" -> "urns" - def self.entry_key(original, extension) - original.split('/').last.gsub(".#{extension}", '') - end - def self.unpack_bundle_contents(zip, type = nil) - bundle_contents = { bundle: nil, measures: {}, patients: {}, extensions: {}, results: {}, valuesets: {} } - Zip::ZipFile.open(zip.path) do |zipfile| - zipfile.entries.each do |entry| - bundle_contents[:bundle] = zipfile.read(entry.name) if entry.name.include? "bundle" - if type.nil? || entry.name.match(Regexp.new("/#{type}/")) - bundle_contents[:measures][entry_key(entry.name, "json")] = zipfile.read(entry.name) if entry.name.match /^measures.*\.json$/ - bundle_contents[:patients][entry_key(entry.name, "json")] = zipfile.read(entry.name) if entry.name.match /^patients.*\.json$/ # Only need to import one of the formats - bundle_contents[:results][entry_key(entry.name,"json")] = zipfile.read(entry.name) if entry.name.match /^results.*\.json/ - end - bundle_contents[:extensions][entry_key(entry.name,"js")] = zipfile.read(entry.name) if entry.name.match /^library_functions.*\.js/ + # Save a javascript function into Mongo's system.js collection for measure execution. + # + # @param [String] name The name by which the function will be referred. + # @param [String] fn The body of the function being saved. + def self.save_system_js_fn(name, fn) + fn = "function () {\n #{fn} \n }" + Mongoid.default_session['system.js'].find('_id' => name).upsert( + { + "_id" => name, + "value" => Moped::BSON::Code.new(fn) + } + ) + end - bundle_contents[:valuesets][entry_key(entry.name,"json")] = zipfile.read(entry.name) if entry.name.match /^value_sets.*\.json/ - end - end - bundle_contents - end - end + # A utility function for finding files in a bundle. Strip a file path of it's extension and just give the filename. + # + # @param [String] original A file path. + # @param [String] extension A file extension. + # @return The filename at the end of the original String path with the extension removed. e.g. "/boo/urns.html" -> "urns" + def self.entry_key(original, extension) + original.split('/').last.gsub(".#{extension}", '') + end - end - end + def self.unpack_bundle(zip) + HealthDataStandards::CQM::Bundle.new(JSON.parse(zip.read(SOURCE_ROOTS[:bundle]),max_nesting: 100)) + end + + def self.unpack_and_store_system_js(zip) + zip.glob(SOURCE_ROOTS[:libraries]).each do |entry| + name = Pathname.new(entry.name).basename('.js').to_s + contents = entry.get_input_stream.read + save_system_js_fn(name, contents) + end + end + + def self.unpack_and_store_measures(zip, type, bundle, update_measures) + measure_ids = [] + entries = zip.glob(File.join(SOURCE_ROOTS[:measures],type || '**','*.json')) + entries.each_with_index do |entry, index| + source_measure = unpack_json(entry) + # we clone so that we have a source without a bundle id + measure = source_measure.clone + measure_ids << measure['id'] + measure['bundle_id'] = bundle.id + Mongoid.default_session["measures"].insert(measure) + + if update_measures + Mongoid.default_session["measures"].where({hqmf_id: measure["hqmf_id"], sub_id: measure["sub_id"]}).each do |m| + b = HealthDataStandards::CQM::Bundle.find(m["bundle_id"]) + if b.version < bundle.version + m.merge!(source_measure) + Mongoid.default_session["measures"].where({"_id" => m["_id"]}).update(m) + end + end + end + report_progress('measures', (index*100/entries.length)) if index%10 == 0 + end + puts "\rLoading: Measures Complete " + measure_ids + end + + def self.unpack_and_store_patients(zip, type, bundle) + entries = zip.glob(File.join(SOURCE_ROOTS[:patients],type || '**','json','*.json')) + entries.each_with_index do |entry, index| + patient = Record.new(unpack_json(entry)) + patient['bundle_id'] = bundle.id + patient.save + report_progress('patients', (index*100/entries.length)) if index%10 == 0 + end + puts "\rLoading: Patients Complete " + end + + def self.unpack_and_store_valuesets(zip, bundle) + entries = zip.glob(SOURCE_ROOTS[:valuesets]) + bulk = [] + entries.each_with_index do |entry, index| + vs = HealthDataStandards::SVS::ValueSet.new(unpack_json(entry)) + vs['bundle_id'] = bundle.id + bulk << vs + report_progress('Value Sets', (index*100/entries.length)) if index%10 == 0 + end + HealthDataStandards::SVS::ValueSet.collection.insert(bulk.map {|vs| vs.as_document}) + puts "\rLoading: Value Sets Complete " + end + + def self.unpack_and_store_results(zip, type, measure_ids, bundle) + zip.glob(File.join(SOURCE_ROOTS[:results],'*.json')).each do |entry| + name = Pathname.new(entry.name).basename('.json').to_s + collection = (name == "by_patient") ? "patient_cache" : "query_cache" + + contents = unpack_json(entry) + + if (type) + contents.select! {|entry| measure_ids.include? entry['measure_id']} if collection == 'query_cache' + contents.select! {|entry| measure_ids.include? entry['value']['measure_id']} if collection == 'patient_cache' + end + + contents.each do |document| + document['bundle_id'] = bundle.id + Mongoid.default_session[collection].insert(document) + end + end + puts "\rLoading: Results Complete " + end + + def self.unpack_json(entry) + JSON.parse(entry.get_input_stream.read,:max_nesting => 100) + end + + def self.report_progress(label, percent) + print "\rLoading: #{label} #{percent}% complete" + STDOUT.flush + end + + end + end + end end