# frozen_string_literal: true module MalawiHivProgramReports module Moh # This is the Cumulative Cohort Builder class # rubocop:disable Metrics/ClassLength class CumulativeCohort include MalawiHivProgramReports::Utils::CommonSqlQueryUtils attr_reader :start_date, :end_date, :locations, :rebuild, :outcome, :definition LOCK_FILE = 'art_service/reports/cumulative_cohort.lock' def initialize(start_date:, end_date:, **kwargs) @start_date = ActiveRecord::Base.connection.quote(start_date) @end_date = ActiveRecord::Base.connection.quote(end_date) @rebuild = kwargs[:rebuild]&.casecmp?('true') locations = kwargs[:locations] @locations = locations.present? ? locations.split(',') : [] @definition = kwargs[:definition] end def find_report start_time = Time.now prepare_tables clear_tables if rebuild process_thread end_time = Time.now time_in_minutes = ((end_time - start_time) / 60).round(2) Rails.logger.info("Cumulative Cohort report took #{time_in_minutes} minutes to generate") { cohort_time: time_in_minutes } end private # we have these steps to build the cohort report for all patients nation wide # 1. We want to filter out patients who are drug refills and consultations as at the end of the quarter # 2. We want to check who in 1 was registered as facility client at the end of the quarter # 3. We want to the first ARV dispensation date for each patient and may not necessarily be in 2 above but as at the end of the quarter # 4. We want to get information on whether the clients in 3 above have ever registered somewhere else. This is to check if they are transfers in # 5. We want to get information on clients birthdate etc after joining all the above # 6. Finally we want to get the outcomes of the clients in 5 above # rubocop:disable Metrics/MethodLength def process_thread # use locations to thread but process 10 locations at a time queue = Queue.new locations.each { |loc| queue << loc } threads = Array.new(10) do Thread.new do until queue.empty? loc = begin queue.pop(true) rescue StandardError nil end next unless loc ActiveRecord::Base.connection_pool.with_connection do process_data loc end end end end threads.each(&:join) end # rubocop:enable Metrics/MethodLength def process_data(location) start_time = Time.now cdr_other_patient_types location external_clients location transfer_ins location min_drug_orders location potential_cohort_members location reason_for_starting_art location cohort_members location outcome = MalawiHivProgramReports::Moh::CumulativeOutcome.new(end_date:, location:, definition:, rebuild:) outcome.find_report end_time = Time.now time_taken = ((end_time - start_time) / 60).round(2) save_completed_site(location:, time_taken:) end # rubocop:disable Metrics/CyclomaticComplexity def prepare_tables create_cdr_other_patient_types unless check_if_table_exists('cdr_other_patient_types') create_temp_potential_cohort_members_table unless check_if_table_exists('cdr_temp_potential_cohort_members') create_min_drug_orders_table unless check_if_table_exists('cdr_temp_min_drug_orders') create_transfer_ins_table unless check_if_table_exists('cdr_temp_transfer_ins') create_external_clients_table unless check_if_table_exists('cdr_temp_external_clients') create_temp_cohort_members_table unless check_if_table_exists('cdr_temp_cohort_members') create_cdr_reason_for_starting_art unless check_if_table_exists('cdr_reason_for_starting_art') create_cdr_temp_cohort_status unless check_if_table_exists('cdr_temp_cohort_status') end # rubocop:enable Metrics/CyclomaticComplexity def save_completed_site(location:, time_taken:) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_cohort_status VALUES (#{location}, DATE(#{start_date}), DATE(#{end_date}), 'completed', #{time_taken}) SQL end def create_cdr_temp_cohort_status ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_temp_cohort_status ( site_id INT NOT NULL, start_date DATE NOT NULL, end_date DATE NOT NULL, status VARCHAR(50) NOT NULL, time_taken DECIMAL(40,2) DEFAULT NULL, PRIMARY KEY (site_id, start_date, end_date) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 SQL end def create_external_clients_table ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_temp_external_clients ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, patient_types VARCHAR(255) DEFAULT NULL, encounter_id INT(11) DEFAULT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL end def create_min_drug_orders_table ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_temp_min_drug_orders ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, start_date DATE DEFAULT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL end def create_transfer_ins_table ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_temp_transfer_ins ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, value_datetime DATE DEFAULT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL end # rubocop:disable Metrics/MethodLength def create_temp_potential_cohort_members_table ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_temp_potential_cohort_members ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, birthdate DATE DEFAULT NULL, birthdate_estimated TINYINT(1) DEFAULT NULL, death_date DATE DEFAULT NULL, gender CHAR(1) DEFAULT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL end def create_temp_cohort_members_table ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_temp_cohort_members ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, birthdate DATE DEFAULT NULL, birthdate_estimated TINYINT(1) DEFAULT NULL, death_date DATE DEFAULT NULL, gender CHAR(1) DEFAULT NULL, date_enrolled DATE DEFAULT NULL, earliest_start_date DATE DEFAULT NULL, recorded_start_date DATE DEFAULT NULL, age_at_initiation INT(11) DEFAULT NULL, age_in_days INT(11) DEFAULT NULL, reason_for_starting_art INT(11) DEFAULT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL create_cohort_member_indexes end # rubocop:enable Metrics/MethodLength def check_if_table_exists(table_name) result = ActiveRecord::Base.connection.select_one <<~SQL SELECT COUNT(*) AS count FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = '#{table_name}' SQL result['count'].to_i.positive? end def create_cohort_member_indexes ActiveRecord::Base.connection.execute <<~SQL CREATE INDEX idx_enrolled ON cdr_temp_cohort_members (date_enrolled) SQL ActiveRecord::Base.connection.execute <<~SQL CREATE INDEX idx_earliest ON cdr_temp_cohort_members (earliest_start_date) SQL ActiveRecord::Base.connection.execute <<~SQL CREATE INDEX idx_recorded ON cdr_temp_cohort_members (recorded_start_date) SQL end def create_cdr_other_patient_types ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_other_patient_types ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL end def create_cdr_reason_for_starting_art ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_reason_for_starting_art ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, reason_for_starting_art INT(11) DEFAULT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL create_cdr_reason_for_starting_art_indexes end def create_cdr_reason_for_starting_art_indexes ActiveRecord::Base.connection.execute <<~SQL CREATE INDEX idx_reason_for_starting_art ON cdr_reason_for_starting_art (reason_for_starting_art) SQL end # rubocop:disable Metrics/MethodLength def cohort_members(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_cohort_members PARTITION (p#{location}) SELECT pcm.patient_id, pcm.site_id, pcm.birthdate, pcm.birthdate_estimated, pcm.death_date, pcm.gender, mdo.start_date AS date_enrolled, COALESCE(ti.value_datetime, mdo.start_date) AS earliest_start_date, ti.value_datetime AS recorded_start_date, IF(pcm.birthdate IS NOT NULL, TIMESTAMPDIFF(YEAR, pcm.birthdate, COALESCE(ti.value_datetime, mdo.start_date)), NULL) AS age_at_initiation, IF(pcm.birthdate IS NOT NULL, TIMESTAMPDIFF(DAY, pcm.birthdate, COALESCE(ti.value_datetime, mdo.start_date)), NULL) AS age_in_days, rfsa.reason_for_starting_art FROM cdr_temp_potential_cohort_members pcm INNER JOIN cdr_temp_min_drug_orders mdo ON mdo.patient_id = pcm.patient_id AND mdo.site_id = pcm.site_id INNER JOIN cdr_reason_for_starting_art rfsa ON rfsa.patient_id = pcm.patient_id AND rfsa.site_id = pcm.site_id LEFT JOIN cdr_temp_transfer_ins ti ON ti.patient_id = pcm.patient_id AND ti.site_id = pcm.site_id WHERE pcm.site_id = #{location} HAVING reason_for_starting_art IS NOT NULL ON DUPLICATE KEY UPDATE birthdate = VALUES(birthdate), birthdate_estimated = VALUES(birthdate_estimated), death_date = VALUES(death_date), gender = VALUES(gender), date_enrolled = VALUES(date_enrolled), earliest_start_date = VALUES(earliest_start_date), recorded_start_date = VALUES(recorded_start_date), age_at_initiation = VALUES(age_at_initiation), age_in_days = VALUES(age_in_days), reason_for_starting_art = VALUES(reason_for_starting_art) SQL end def potential_cohort_members(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_potential_cohort_members PARTITION (p#{location}) SELECT pp.patient_id, pp.site_id, p.birthdate, p.birthdate_estimated, p.death_date, LEFT(p.gender, 1) gender FROM patient_program pp INNER JOIN person p ON p.person_id = pp.patient_id AND p.site_id = pp.site_id AND p.voided = 0 INNER JOIN patient_state ps ON ps.patient_program_id = pp.patient_program_id AND ps.site_id = pp.site_id AND ps.voided = 0 AND ps.state = 7 AND ps.start_date IS NOT NULL -- 7 is On antiretrovirals AND ps.start_date < (DATE(#{end_date}) + INTERVAL 1 DAY) WHERE pp.program_id = 1 AND pp.voided = 0 AND (pp.patient_id, pp.site_id) NOT IN (SELECT patient_id, site_id FROM cdr_temp_external_clients) AND pp.site_id = #{location} GROUP BY pp.patient_id, pp.site_id ON DUPLICATE KEY UPDATE birthdate = VALUES(birthdate), birthdate_estimated = VALUES(birthdate_estimated), death_date = VALUES(death_date), gender = VALUES(gender) SQL end def min_drug_orders(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_min_drug_orders PARTITION (p#{location}) SELECT o.patient_id, o.site_id, DATE(MIN(o.start_date)) start_date FROM orders o INNER JOIN drug_order do ON do.order_id = o.order_id AND do.site_id = o.site_id AND do.quantity > 0 LEFT JOIN ( SELECT o.person_id patient_id, o.site_id, DATE(MIN(o.obs_datetime)) registered_date FROM obs o INNER JOIN cdr_other_patient_types other ON other.patient_id = o.person_id AND other.site_id = o.site_id WHERE o.concept_id = 3289 -- Type of patient AND o.value_coded = 7572 -- New patient AND o.voided = 0 AND o.obs_datetime < (DATE(#{end_date}) + INTERVAL 1 DAY) GROUP BY o.person_id, o.site_id ) np ON np.patient_id = o.patient_id AND np.site_id = o.site_id WHERE o.voided = 0 AND o.start_date < (DATE(#{end_date}) + INTERVAL 1 DAY) AND o.start_date > COALESCE(np.registered_date, DATE('1900-01-01')) AND o.concept_id IN (SELECT concept_id FROM concept_set WHERE concept_set = 1085) -- 1085 is ARV DRUGS AND o.order_type_id = 1 AND o.site_id = #{location} GROUP BY o.patient_id, o.site_id ON DUPLICATE KEY UPDATE start_date = VALUES(start_date) SQL end def transfer_ins(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_transfer_ins PARTITION (p#{location}) SELECT o.person_id, o.site_id, DATE(MIN(o.value_datetime)) value_datetime FROM obs o INNER JOIN encounter e ON e.patient_id = o.person_id AND e.site_id = o.site_id AND e.encounter_id = o.encounter_id AND e.program_id = 1 AND e.encounter_datetime < (DATE(#{end_date}) + INTERVAL 1 DAY) AND e.encounter_type = 9 -- HIV CLINIC REGISTRATION AND e.voided = 0 WHERE o.concept_id = 2516 AND o.voided = 0 AND o.obs_datetime < (DATE(#{end_date}) + INTERVAL 1 DAY) -- 2516 is Date antiretrovirals started AND o.site_id = #{location} GROUP BY o.person_id, o.site_id ON DUPLICATE KEY UPDATE value_datetime = VALUES(value_datetime) SQL end def external_clients(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_external_clients PARTITION (p#{location}) SELECT e.patient_id, e.site_id, GROUP_CONCAT(DISTINCT(patient_type.value_coded)) AS patient_types, clinic_registration.encounter_id FROM patient_program as e INNER JOIN obs AS patient_type ON patient_type.person_id = e.patient_id AND patient_type.site_id = e.site_id AND patient_type.voided = 0 AND patient_type.concept_id = 3289 -- Type of patient AND patient_type.obs_datetime < DATE(#{end_date}) + INTERVAL 1 DAY LEFT JOIN encounter as clinic_registration ON clinic_registration.patient_id = e.patient_id AND clinic_registration.site_id = e.site_id AND clinic_registration.program_id = 1 AND clinic_registration.encounter_type = 9 -- HIV CLINIC REGISTRATION AND clinic_registration.encounter_datetime < DATE(#{end_date}) + INTERVAL 1 DAY AND clinic_registration.voided = 0 WHERE e.program_id = 1 -- HIV program AND e.voided = 0 AND e.site_id = #{location} -- AND clinic_registration.encounter_id IS NOT NULL -- bone of contention GROUP BY e.patient_id, e.site_id HAVING FIND_IN_SET('7572', patient_types) = 0 AND encounter_id IS NULL ON DUPLICATE KEY UPDATE patient_types = VALUES(patient_types) SQL end def cdr_other_patient_types(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_other_patient_types PARTITION (p#{location}) SELECT o.person_id, o.site_id FROM obs o WHERE o.concept_id = 3289 -- Type of patient AND o.value_coded != 7572 -- New patient AND o.voided = 0 AND o.obs_datetime < (DATE(#{end_date}) + INTERVAL 1 DAY) AND o.site_id = #{location} GROUP BY o.person_id, o.site_id -- on duplicate just ignore ON DUPLICATE KEY UPDATE site_id = VALUES(site_id) SQL end def reason_for_starting_art(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_reason_for_starting_art PARTITION (p#{location}) SELECT a.person_id, a.site_id, a.value_coded FROM obs a INNER JOIN cdr_temp_potential_cohort_members ct ON ct.patient_id = a.person_id AND ct.site_id = a.site_id LEFT OUTER JOIN obs b ON a.person_id = b.person_id AND a.site_id = b.site_id AND b.concept_id = a.concept_id AND b.concept_id = 7563 AND a.obs_datetime < b.obs_datetime AND b.voided = 0 AND b.obs_datetime < DATE(#{end_date}) + INTERVAL 1 DAY AND a.obs_datetime < DATE(#{end_date}) + INTERVAL 1 DAY WHERE b.obs_id IS NULL AND a.concept_id = 7563 AND a.voided = 0 AND a.site_id = #{location} GROUP BY a.person_id, a.site_id ON DUPLICATE KEY UPDATE reason_for_starting_art = VALUES(reason_for_starting_art) SQL end # rubocop:enable Metrics/MethodLength def clear_tables # if locations is empty then we truncating otherwise we clear the locations if locations.empty? ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_other_patient_types') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_temp_potential_cohort_members') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_temp_min_drug_orders') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_temp_transfer_ins') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_temp_cohort_members') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_temp_external_clients') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_reason_for_starting_art') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_temp_cohort_status') else ActiveRecord::Base.connection.execute("DELETE FROM cdr_other_patient_types WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_temp_potential_cohort_members WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_temp_min_drug_orders WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_temp_transfer_ins WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_temp_cohort_members WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_temp_external_clients WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_reason_for_starting_art WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_temp_cohort_status WHERE site_id IN (#{locations.join(',')})") end end end # rubocop:enable Metrics/ClassLength end end