module InstDataShipper module DataSources # This module contains the logic for processing Canvas reports module CanvasReports extend ActiveSupport::Concern included do hook :initialize_dump_batch do |context| report_processor_pool = CanvasSync::JobBatches::Pool.new( description: "HD #{export_genre} Export #{tracker.id} Canvas Report Pool", concurrency: 4, clean_when_empty: false, ) context[:report_processor_pool] = report_processor_pool.pid end hook :finalize_dump_batch do if batch_context[:report_processor_pool] CanvasSync::JobBatches::Pool.from_pid(batch_context[:report_processor_pool]).cleanup_redis end end end public def import_canvas_report(*args, **kwargs) _in_canvas_report_pool(:_import_canvas_report, *args, **kwargs) end def import_canvas_report_by_terms(target_table, report_name, terms: [], params: {}, **kwargs) term_ids = (terms || []).map do |term| term.is_a?(Term) ? term.canvas_id : term end Sidekiq::Batch.new.tap do |b| b.description = "Term Scoped #{report_name} Runners" b.context = { report_bid: b.bid, } b.jobs do terms_query = term_ids.present? ? Term.where(canvas_id: term_ids) : Term terms_query.find_each do |t| import_canvas_report(target_table, report_name, params: { **params, enrollment_term_id: t.canvas_id }, **kwargs) end end end end def import_existing_report(table, report) delayed(:_process_canvas_report, table, report: report) end private def _import_canvas_report(target_table, report_name, retry_count: 3, params: {}, **kwargs) report = canvas_sync_client.start_report( 'self', report_name, parameters: params, ) CanvasSync::Jobs::CanvasProcessWaiter.perform_later( "/api/v1/accounts/self/reports/#{report_name}/#{report[:id]}", { instance_of: origin_class, method: :_process_canvas_report, args: [target_table], kwargs: kwargs, }, on_failure: { instance_of: origin_class, method: :_handle_failed_canvas_report, args: [target_table, report_name, kwargs], kwargs: { retry_count: retry_count }, }, status_key: :status, progress_as: :report, ) end def _in_canvas_report_pool(mthd, *args, **kwargs) pool = CanvasSync::JobBatches::Pool.from_pid(batch_context[:report_processor_pool]) AsyncCaller.call_from_pool(pool, self.class, mthd, *args, **kwargs) end def _process_canvas_report(table, report:) table_def = table_schemas.find { |t| t[:warehouse_name].to_s == table } IO.copy_stream(URI.parse(report['attachment']['url']).open, "#{working_dir}/#{table}.csv") inner_block = ->(file) { CSV.foreach("#{working_dir}/#{table}.csv", headers: true) do |m| file << table_def[:columns].map do |c| c[:transformer].present? ? m.instance_exec(&c[:transformer]) : m[c[:local_name].to_s] end end } upload_data(table_def, extra: report['id'], &inner_block) end def _handle_failed_canvas_report(table, report_name, kwargs, retry_count:, report:) # rubocop:disable Lint/UnusedMethodArgument if retry_count.positive? tbid = batch_context[:report_bid] || batch_context[:root_bid] Sidekiq::Batch.new(tbid).jobs do import_canvas_report(table, report_name, retry_count: retry_count - 1, **kwargs.symbolize_keys) end else cleanup_fatal_error! end end end end end