# frozen_string_literal: true module MalawiHivProgramReports module Moh # This is the Cumulative Cohort Builder class # rubocop:disable Metrics/ClassLength class CumulativeCohort include MalawiHivProgramReports::Utils::CommonSqlQueryUtils attr_reader :start_date, :end_date, :locations, :rebuild, :outcome, :definition LOCK_FILE = 'art_service/reports/cumulative_cohort.lock' def initialize(start_date:, end_date:, **kwargs) @start_date = ActiveRecord::Base.connection.quote(start_date) @end_date = ActiveRecord::Base.connection.quote(end_date) @rebuild = kwargs[:rebuild]&.casecmp?('true') locations = kwargs[:locations] @locations = locations.present? ? locations.split(',') : [] @definition = kwargs[:definition] || 'pepfar' definition = @definition.downcase raise ArgumentError, "Invalid outcomes definition: #{definition}" unless %w[moh pepfar].include?(definition) end def find_report start_time = Time.now handle_tables clear_cohort_status unless rebuild process_thread(locations:) end_time = Time.now time_in_minutes = ((end_time - start_time) / 60).round(2) handle_failed time_taken_for_failed = ((Time.now - end_time) / 60).round(2) Rails.logger.info("Cumulative Cohort report took #{time_in_minutes} minutes to generate for these locations: #{locations}") { cohort_time: time_in_minutes, processing_failed: time_taken_for_failed } end private # we have these steps to build the cohort report for all patients nation wide # 1. We want to filter out patients who are drug refills and consultations as at the end of the quarter # 2. We want to check who in 1 was registered as facility client at the end of the quarter # 3. We want to the first ARV dispensation date for each patient and may not necessarily be in 2 above but as at the end of the quarter # 4. We want to get information on whether the clients in 3 above have ever registered somewhere else. This is to check if they are transfers in # 5. We want to get information on clients birthdate etc after joining all the above # 6. Finally we want to get the outcomes of the clients in 5 above # rubocop:disable Metrics/MethodLength def process_thread(locations: []) # use locations to thread but process 10 locations at a time queue = Queue.new locations.each { |loc| queue << loc } threads = Array.new(40) do Thread.new do until queue.empty? loc = begin queue.pop(true) rescue StandardError nil end next unless loc ActiveRecord::Base.connection_pool.with_connection do process_data loc rescue StandardError => e Rails.logger.info("Error processing location #{loc}: #{e.message}") Rails.logger.info(e.backtrace.join("\n")) save_incomplete_site(location: loc, time_taken: 0) end end end end threads.each(&:join) end # rubocop:enable Metrics/MethodLength def reprocess_failed failed_locs = failed_sites&.map { |loc| loc[:site_id] } return if failed_locs.empty? process_thread(locations: failed_locs) end def handle_failed failed_locs = failed_sites&.map { |loc| loc['site_id'].to_i } return if failed_locs.empty? 4.times do reprocess_failed end failed_locs = failed_sites&.map { |loc| loc['site_id'].to_i } return if failed_locs.empty? failed_locs.each do |loc| ActiveRecord::Base.connection_pool.with_connection do process_data loc rescue StandardError => e Rails.logger.info("Error processing location #{loc}: #{e.message}") Rails.logger.info(e.backtrace.join("\n")) save_incomplete_site(location: loc, time_taken: 0) end end end def failed_sites ActiveRecord::Base.connection.select_all <<~SQL SELECT site_id FROM cdr_temp_cohort_status WHERE status = 'incomplete' AND site_id IN (#{locations.join(',')}) SQL end def handle_tables prepare_tables clear_tables if rebuild outcome = MalawiHivProgramReports::Moh::CumulativeOutcome.new(end_date:, location: 0, definition:, rebuild:, start_date:) outcome.handle_tables end def process_data(location) start_time = Time.now if rebuild cdr_other_patient_types location external_clients location transfer_ins location min_drug_orders location potential_cohort_members location reason_for_starting_art location cohort_members location end outcome = MalawiHivProgramReports::Moh::CumulativeOutcome.new(end_date:, location:, definition:, rebuild:, start_date:) rebuild ? outcome.find_report : outcome.update_outcomes_by_definition end_time = Time.now time_taken = ((end_time - start_time) / 60).round(2) save_completed_site(location:, time_taken:) end # rubocop:disable Metrics/CyclomaticComplexity def prepare_tables create_cdr_other_patient_types unless check_if_table_exists('cdr_other_patient_types') create_temp_potential_cohort_members_table unless check_if_table_exists('cdr_temp_potential_cohort_members') create_min_drug_orders_table unless check_if_table_exists('cdr_temp_min_drug_orders') create_transfer_ins_table unless check_if_table_exists('cdr_temp_transfer_ins') create_external_clients_table unless check_if_table_exists('cdr_temp_external_clients') create_temp_cohort_members_table unless check_if_table_exists('cdr_temp_cohort_members') create_cdr_reason_for_starting_art unless check_if_table_exists('cdr_reason_for_starting_art') create_cdr_temp_cohort_status unless check_if_table_exists('cdr_temp_cohort_status') end # rubocop:enable Metrics/CyclomaticComplexity def save_completed_site(location:, time_taken:) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_cohort_status VALUES (#{location}, DATE(#{start_date}), DATE(#{end_date}), 'completed', #{time_taken}) SQL end def save_incomplete_site(location:, time_taken:) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_cohort_status VALUES (#{location}, DATE(#{start_date}), DATE(#{end_date}), 'incomplete', #{time_taken}) SQL end def create_cdr_temp_cohort_status ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_temp_cohort_status ( site_id INT NOT NULL, start_date DATE NOT NULL, end_date DATE NOT NULL, status VARCHAR(50) NOT NULL, time_taken DECIMAL(40,2) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 SQL end def create_external_clients_table ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_temp_external_clients ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, patient_types VARCHAR(255) DEFAULT NULL, encounter_id INT(11) DEFAULT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL end def create_min_drug_orders_table ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_temp_min_drug_orders ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, start_date DATE DEFAULT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL end def create_transfer_ins_table ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_temp_transfer_ins ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, value_datetime DATE DEFAULT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL end # rubocop:disable Metrics/MethodLength def create_temp_potential_cohort_members_table ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_temp_potential_cohort_members ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, birthdate DATE DEFAULT NULL, birthdate_estimated TINYINT(1) DEFAULT NULL, death_date DATE DEFAULT NULL, gender CHAR(1) DEFAULT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL end def create_temp_cohort_members_table ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_temp_cohort_members ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, birthdate DATE DEFAULT NULL, birthdate_estimated TINYINT(1) DEFAULT NULL, death_date DATE DEFAULT NULL, gender CHAR(1) DEFAULT NULL, date_enrolled DATE DEFAULT NULL, earliest_start_date DATE DEFAULT NULL, recorded_start_date DATE DEFAULT NULL, age_at_initiation INT(11) DEFAULT NULL, age_in_days INT(11) DEFAULT NULL, reason_for_starting_art INT(11) DEFAULT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL create_cohort_member_indexes end # rubocop:enable Metrics/MethodLength def check_if_table_exists(table_name) result = ActiveRecord::Base.connection.select_one <<~SQL SELECT COUNT(*) AS count FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = '#{table_name}' SQL result['count'].to_i.positive? end def create_cohort_member_indexes ActiveRecord::Base.connection.execute <<~SQL CREATE INDEX idx_enrolled ON cdr_temp_cohort_members (date_enrolled) SQL ActiveRecord::Base.connection.execute <<~SQL CREATE INDEX idx_earliest ON cdr_temp_cohort_members (earliest_start_date) SQL ActiveRecord::Base.connection.execute <<~SQL CREATE INDEX idx_recorded ON cdr_temp_cohort_members (recorded_start_date) SQL end def create_cdr_other_patient_types ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_other_patient_types ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL end def create_cdr_reason_for_starting_art ActiveRecord::Base.connection.execute <<~SQL CREATE TABLE IF NOT EXISTS cdr_reason_for_starting_art ( patient_id INT(11) NOT NULL, site_id INT(11) NOT NULL, reason_for_starting_art INT(11) DEFAULT NULL, PRIMARY KEY (patient_id, site_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 PARTITION BY LIST(site_id) (#{partition_by_site}) SQL create_cdr_reason_for_starting_art_indexes end def create_cdr_reason_for_starting_art_indexes ActiveRecord::Base.connection.execute <<~SQL CREATE INDEX idx_reason_for_starting_art ON cdr_reason_for_starting_art (reason_for_starting_art) SQL end # rubocop:disable Metrics/MethodLength def cohort_members(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_cohort_members PARTITION (p#{location}) SELECT pcm.patient_id, #{location}, pcm.birthdate, pcm.birthdate_estimated, pcm.death_date, pcm.gender, mdo.start_date AS date_enrolled, COALESCE(ti.value_datetime, mdo.start_date) AS earliest_start_date, ti.value_datetime AS recorded_start_date, IF(pcm.birthdate IS NOT NULL, TIMESTAMPDIFF(YEAR, pcm.birthdate, COALESCE(ti.value_datetime, mdo.start_date)), NULL) AS age_at_initiation, IF(pcm.birthdate IS NOT NULL, TIMESTAMPDIFF(DAY, pcm.birthdate, COALESCE(ti.value_datetime, mdo.start_date)), NULL) AS age_in_days, rfsa.reason_for_starting_art FROM cdr_temp_potential_cohort_members PARTITION (p#{location}) pcm INNER JOIN cdr_temp_min_drug_orders PARTITION (p#{location}) mdo ON mdo.patient_id = pcm.patient_id INNER JOIN cdr_reason_for_starting_art PARTITION (p#{location}) AS rfsa ON rfsa.patient_id = pcm.patient_id LEFT JOIN cdr_temp_transfer_ins PARTITION (p#{location}) ti ON ti.patient_id = pcm.patient_id HAVING reason_for_starting_art IS NOT NULL ON DUPLICATE KEY UPDATE birthdate = VALUES(birthdate), birthdate_estimated = VALUES(birthdate_estimated), death_date = VALUES(death_date), gender = VALUES(gender), date_enrolled = VALUES(date_enrolled), earliest_start_date = VALUES(earliest_start_date), recorded_start_date = VALUES(recorded_start_date), age_at_initiation = VALUES(age_at_initiation), age_in_days = VALUES(age_in_days), reason_for_starting_art = VALUES(reason_for_starting_art) SQL end def potential_cohort_members(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_potential_cohort_members PARTITION (p#{location}) SELECT pp.patient_id, #{location}, p.birthdate, p.birthdate_estimated, p.death_date, LEFT(p.gender, 1) gender FROM patient_program PARTITION (p#{location}) AS pp INNER JOIN person PARTITION (p#{location}) AS p ON p.person_id = pp.patient_id AND p.voided = 0 INNER JOIN patient_state PARTITION (p#{location}) AS ps ON ps.patient_program_id = pp.patient_program_id AND ps.voided = 0 AND ps.state = 7 AND ps.start_date IS NOT NULL -- 7 is On antiretrovirals AND ps.start_date < (DATE(#{end_date}) + INTERVAL 1 DAY) WHERE pp.program_id = 1 AND pp.voided = 0 AND (pp.patient_id) NOT IN (SELECT patient_id FROM cdr_temp_external_clients PARTITION (p#{location})) GROUP BY pp.patient_id ON DUPLICATE KEY UPDATE birthdate = VALUES(birthdate), birthdate_estimated = VALUES(birthdate_estimated), death_date = VALUES(death_date), gender = VALUES(gender) SQL end def min_drug_orders(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_min_drug_orders PARTITION (p#{location}) SELECT o.patient_id, #{location}, DATE(MIN(o.start_date)) start_date FROM orders PARTITION (p#{location}) AS o INNER JOIN drug_order PARTITION (p#{location}) AS do ON do.order_id = o.order_id AND do.quantity > 0 LEFT JOIN ( SELECT o.person_id patient_id, #{location}, DATE(MIN(o.obs_datetime)) registered_date FROM obs PARTITION (p#{location}) AS o INNER JOIN cdr_other_patient_types PARTITION (p#{location}) AS other ON other.patient_id = o.person_id WHERE o.concept_id = 3289 -- Type of patient AND o.value_coded = 7572 -- New patient AND o.voided = 0 AND o.obs_datetime < (DATE(#{end_date}) + INTERVAL 1 DAY) GROUP BY o.person_id ) np ON np.patient_id = o.patient_id WHERE o.voided = 0 AND o.start_date < (DATE(#{end_date}) + INTERVAL 1 DAY) AND o.start_date > COALESCE(np.registered_date, DATE('1900-01-01')) AND o.concept_id IN (SELECT concept_id FROM concept_set WHERE concept_set = 1085) -- 1085 is ARV DRUGS AND o.order_type_id = 1 GROUP BY o.patient_id ON DUPLICATE KEY UPDATE start_date = VALUES(start_date) SQL end def transfer_ins(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_transfer_ins PARTITION (p#{location}) SELECT o.person_id, #{location}, DATE(MIN(o.value_datetime)) value_datetime FROM obs PARTITION (p#{location}) AS o INNER JOIN encounter PARTITION (p#{location}) AS e ON e.patient_id = o.person_id AND e.encounter_id = o.encounter_id AND e.program_id = 1 -- HIV Program AND e.encounter_datetime < (DATE(#{end_date}) + INTERVAL 1 DAY) AND e.encounter_type = 9 -- HIV CLINIC REGISTRATION AND e.voided = 0 WHERE o.concept_id = 2516 AND o.voided = 0 AND o.obs_datetime < (DATE(#{end_date}) + INTERVAL 1 DAY) -- 2516 is Date antiretrovirals started GROUP BY o.person_id ON DUPLICATE KEY UPDATE value_datetime = VALUES(value_datetime) SQL end def external_clients(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_temp_external_clients PARTITION (p#{location}) SELECT e.patient_id, #{location}, GROUP_CONCAT(DISTINCT(patient_type.value_coded)) AS patient_types, clinic_registration.encounter_id FROM patient_program PARTITION (p#{location}) AS e INNER JOIN obs PARTITION (p#{location}) AS patient_type ON patient_type.person_id = e.patient_id AND patient_type.voided = 0 AND patient_type.concept_id = 3289 -- Type of patient AND patient_type.obs_datetime < DATE(#{end_date}) + INTERVAL 1 DAY LEFT JOIN encounter PARTITION (p#{location}) AS clinic_registration ON clinic_registration.patient_id = e.patient_id AND clinic_registration.program_id = 1 -- HIV PROGRAM AND clinic_registration.encounter_type = 9 -- HIV CLINIC REGISTRATION AND clinic_registration.encounter_datetime < DATE(#{end_date}) + INTERVAL 1 DAY AND clinic_registration.voided = 0 WHERE e.program_id = 1 -- HIV program AND e.voided = 0 -- AND clinic_registration.encounter_id IS NOT NULL -- bone of contention GROUP BY e.patient_id HAVING FIND_IN_SET('7572', patient_types) = 0 AND encounter_id IS NULL ON DUPLICATE KEY UPDATE patient_types = VALUES(patient_types) SQL end def cdr_other_patient_types(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_other_patient_types PARTITION (p#{location}) SELECT o.person_id, #{location} FROM obs PARTITION (p#{location}) AS o WHERE o.concept_id = 3289 -- Type of patient AND o.value_coded != 7572 -- New patient AND o.voided = 0 AND o.obs_datetime < (DATE(#{end_date}) + INTERVAL 1 DAY) GROUP BY o.person_id -- on duplicate just ignore ON DUPLICATE KEY UPDATE site_id = VALUES(site_id) SQL end def reason_for_starting_art(location) ActiveRecord::Base.connection.execute <<~SQL INSERT INTO cdr_reason_for_starting_art PARTITION (p#{location}) SELECT a.person_id, #{location}, a.value_coded FROM obs PARTITION (p#{location}) a INNER JOIN cdr_temp_potential_cohort_members PARTITION (p#{location}) AS ct ON ct.patient_id = a.person_id LEFT OUTER JOIN obs PARTITION (p#{location}) AS b ON a.person_id = b.person_id AND b.concept_id = a.concept_id AND b.concept_id = 7563 AND a.obs_datetime < b.obs_datetime AND b.voided = 0 AND b.obs_datetime < DATE(#{end_date}) + INTERVAL 1 DAY AND a.obs_datetime < DATE(#{end_date}) + INTERVAL 1 DAY WHERE b.obs_id IS NULL AND a.concept_id = 7563 AND a.voided = 0 GROUP BY a.person_id ON DUPLICATE KEY UPDATE reason_for_starting_art = VALUES(reason_for_starting_art) SQL end # rubocop:enable Metrics/MethodLength def clear_tables # if locations is empty then we truncating otherwise we clear the locations if locations.empty? ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_other_patient_types') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_temp_potential_cohort_members') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_temp_min_drug_orders') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_temp_transfer_ins') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_temp_cohort_members') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_temp_external_clients') ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_reason_for_starting_art') else ActiveRecord::Base.connection.execute("DELETE FROM cdr_other_patient_types WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_temp_potential_cohort_members WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_temp_min_drug_orders WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_temp_transfer_ins WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_temp_cohort_members WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_temp_external_clients WHERE site_id IN (#{locations.join(',')})") ActiveRecord::Base.connection.execute("DELETE FROM cdr_reason_for_starting_art WHERE site_id IN (#{locations.join(',')})") end clear_cohort_status end def clear_cohort_status if locations.empty? ActiveRecord::Base.connection.execute('TRUNCATE TABLE cdr_temp_cohort_status') else ActiveRecord::Base.connection.execute("DELETE FROM cdr_temp_cohort_status WHERE site_id IN (#{locations.join(',')})") end end end # rubocop:enable Metrics/ClassLength end end