module InstDataShipper module DataSources # This module contains the logic for processing Canvas reports module CanvasReports extend ActiveSupport::Concern included do hook :initialize_dump_batch do |context| report_processor_pool = CanvasSync::JobBatches::Pool.new( description: "HD #{export_genre} Export #{tracker.id} Canvas Report Pool", concurrency: 4, clean_when_empty: false, ) context[:report_processor_pool] = report_processor_pool.pid end hook :finalize_dump_batch do if batch_context[:report_processor_pool] CanvasSync::JobBatches::Pool.from_pid(batch_context[:report_processor_pool]).cleanup_redis end end end public def import_canvas_report(*args, **kwargs) _in_canvas_report_pool(:_import_canvas_report, *args, **kwargs) end def import_canvas_report_by_terms(*args, **kwargs) _in_canvas_report_pool(:_import_canvas_report_by_terms, *args, **kwargs) end def import_existing_report(report, **kwargs) delayed(:_process_canvas_report, report: report, **kwargs) end private def _import_canvas_report_by_terms(report_name, terms: [], params: {}, **kwargs) term_ids = (terms || []).map do |term| term.is_a?(Term) ? term.canvas_id : term end table_def = lookup_table_schema!(kwargs[:schema_name], report_name) _resolve_report_incremenal_parameters(table_def, params) Sidekiq::Batch.new.tap do |b| b.description = "Term Scoped #{report_name} Runners" b.context = { report_bid: b.bid, } b.jobs do terms_query = term_ids.present? ? Term.where(canvas_id: term_ids) : Term terms_query.find_each do |t| _in_canvas_report_pool(:_trigger_canvas_report, report_name, params: { **params, enrollment_term_id: t.canvas_id }, **kwargs) end end end end def _import_canvas_report(report_name, params: {}, **kwargs) table_def = lookup_table_schema!(kwargs[:schema_name], report_name) _resolve_report_incremenal_parameters(table_def, params) _trigger_canvas_report(report_name, params: params, **kwargs) end def _trigger_canvas_report(report_name, retry_count: 3, params: {}, **kwargs) report = canvas_sync_client.start_report( 'self', report_name, parameters: params, ) CanvasSync::Jobs::CanvasProcessWaiter.perform_later( "/api/v1/accounts/self/reports/#{report_name}/#{report[:id]}", { job: Jobs::AsyncCaller, args: [origin_class, :_process_canvas_report], kwargs: kwargs, }, on_failure: { job: Jobs::AsyncCaller, args: [origin_class, :_handle_failed_canvas_report, report_name, kwargs], kwargs: { retry_count: retry_count }, }, status_key: :status, progress_as: :report, ) end def _in_canvas_report_pool(mthd, *args, **kwargs) pool = CanvasSync::JobBatches::Pool.from_pid(batch_context[:report_processor_pool]) Jobs::AsyncCaller.call_from_pool(pool, self.class, mthd, *args, **kwargs) end def _process_canvas_report(report:, schema_name: nil) table_def = lookup_table_schema!(schema_name, report[:report]) IO.copy_stream(URI.parse(report['attachment']['url']).open, "#{working_dir}/temp_report.csv") inner_block = ->(file) { CSV.foreach("#{working_dir}/temp_report.csv", headers: true) do |m| file << table_def[:columns].map do |c| instance_exec(m, &c[:block]) end end } upload_data(table_def, extra: report['id'], &inner_block) end def _resolve_report_incremenal_parameters(table_def, params) if table_is_incremental?(table_def) inc = table_def[:incremental] scope = inc[:scope] if scope != false scope ||= "updated_after" if scope.is_a?(Proc) scope = instance_exec(params, &scope) if scope.is_a?(Hash) && scope != params params.merge!(scope) end elsif scope.is_a?(String) || scope.is_a?(Symbol) params[scope] = incremental_since end end end params end def _handle_failed_canvas_report(report_name, kwargs, retry_count:, report:) if retry_count.positive? tbid = batch_context[:report_bid] || batch_context[:root_bid] Sidekiq::Batch.new(tbid).jobs do _in_canvas_report_pool(:_trigger_canvas_report, report_name, retry_count: retry_count - 1, **kwargs.symbolize_keys) end else # TODO Allow marking the table as incomplete. Destination code can then decide how to handle incomplete tables since (eg) incremental imports wouldn't mind too much cleanup_fatal_error! end end end end end