require 'sidekiq/api' namespace :gi_job_task do desc "job_status が更新されずにプロセスが無くなってしまったジョブの job_status を errorにする" task :terminate_ghost_jobs => :environment do # puts "start terminate_ghost_jobs" time_now = Time.now no_reaction_minutes = 30 # 30分間更新されていない未完了のジョブ jobs = Job.ghosts(no_reaction_minutes) counts_target_jobs = jobs.size counts_terminated = 0 jobs.each do |job| # Rails.logger.ap({job: job}) # ジョブステータスを更新 job.append_error("強制停止", "ジョブが停止されました。再度実行してください。") job.update_columns({job_status: Job.job_statuses[:error]}) counts_terminated += 1 Rails.logger.fatal("terminate_ghost_job: job_id: #{job.id}") end puts "#{time_now.strftime('%Y/%m/%d %H:%M:%S')} end terminate_ghost_jobs #{counts_terminated}/#{counts_target_jobs}" end task :delete_old_tmp_files => :environment do time_now = Time.now # 一時ディレクトリ内の古いファイルを削除 # クラスタ環境下では一時ディレクトリのファイルはdistでは作成されないが、 # materialize_job_files で作成されたファイルを削除するために動かしている paths = Jobable.delete_old_files local_deleted_counts = 0 paths.each do |path| Rails.logger.info("delete_old_files: path: #{path}") local_deleted_counts += 1 end # 終了している古い job_files の file を削除する old_ended_has_data_sources_jobs = Job.old_ended.has_job_file_data_sources db_deleted_counts = 0 old_ended_has_data_sources_jobs.each do |job| job.job_files.each do |job_file| Rails.logger.info("job_file.delete_data_source: job_id: #{job.id}, job_file_id: #{job_file.id}") job_file.delete_data_source! db_deleted_counts += 1 end end puts "#{time_now.strftime('%Y/%m/%d %H:%M:%S')} end delete_old_tmp_files local: #{local_deleted_counts}, db: #{db_deleted_counts}" end ##### 状況確認のための task desc "job / sidekiq の状態を表示する" task :status => :environment do Rake::Task["gi_job_task:show_recently_jobs"].invoke Rake::Task["gi_job_task:show_job_statuses"].invoke Rake::Task["gi_job_task:show_sidekiq_statuses"].invoke end desc "直近10ジョブを表示する" task :show_recently_jobs => :environment do puts "- show recentry jobs" gi_job_transaction = GiJobTransaction.preload(:owner, :gi_job_logs).order(updated_at: :desc).limit(10) strings = gi_job_transaction.map(&:to_string_line) puts strings.join("\n") end desc "全ジョブのステータス集計を表示する" task :show_job_statuses => :environment do puts "- show jobs statuses" grouped = GiJobTransaction.group(:status, :process_status).count Rails.logger.ap({grouped: grouped}) display = grouped.map do |keys, counts| "#{sprintf("%-12s", keys[0])} | #{sprintf("%-12s", keys[1])}: #{sprintf("%12d", counts)}" end.join("\n") puts display end desc "sidekiqの状況を表示する" task :show_sidekiq_statuses => :environment do puts "- show sidekiq statuses" # Rails.logger.ap({task: "show sidekiq statuses"}) puts "実行中: #{Sidekiq::Workers.new.size} 件" Sidekiq::Workers.new.each do |process_id, thread_id, sidekiq_job| Rails.logger.ap({ process_id: process_id, thread_id: thread_id, sidekiq_job: sidekiq_job }) payload = sidekiq_job["payload"] payload_args = payload["args"][0] job_argument = payload_args["arguments"][0] strings = [] strings << "pid: #{process_id}" strings << "tid: #{thread_id}" strings << "sjid: #{payload_args["job_id"]}" if job_argument && job_argument["gi_job_transaction"] && job_argument["gi_job_transaction"]["_aj_globalid"] strings << "job_id: #{job_argument["gi_job_transaction"]["_aj_globalid"]}" end strings << "class: #{payload_args["job_class"]}" puts "[#{strings.join("][")}]" end display_waiting_proc = Proc.new do |sidekiq_job| Rails.logger.ap({ sidekiq_job: sidekiq_job }) payload = sidekiq_job.item payload_args = payload["args"][0] job_argument = payload_args["arguments"][0] strings = [] strings << "sjid: #{payload_args["job_id"]}" if job_argument && job_argument["gi_job_transaction"] && job_argument["gi_job_transaction"]["_aj_globalid"] strings << "job_id: #{job_argument["gi_job_transaction"]["_aj_globalid"]}" end strings << "class: #{payload_args["job_class"]}" puts "[#{strings.join("][")}]" end puts "キュー: #{Sidekiq::Queue.new.size} 件" Sidekiq::Queue.new.each do |sidekiq_job| display_waiting_proc.call(sidekiq_job) end puts "再試行: #{Sidekiq::RetrySet.new.size} 件" Sidekiq::RetrySet.new.each do |sidekiq_job| display_waiting_proc.call(sidekiq_job) end end task :change_maintenance, ['mode'] => :environment do |task, args| mode = args[:mode] if mode.blank? # modeが指定されなかった場合は現在のモードを表示して終了する maintenance = Maintenance.find_record puts "current mode: #{maintenance.mode}" else # mode変更 Maintenance.change_mode!(mode) if mode == "none" # suspendedされていたジョブを再起動する jobs = Job.suspended jobs.each do |job| job_class_string = job.job_type.classify Rails.logger.ap("class => #{job_class_string}") job_class = job_class_string.constantize # -> ModelName Rails.logger.ap({job_class: job_class}) job_class.start_job(job) end end end end task :materialize_job_files, ['job_id'] => :environment do |task, args| # dbに格納されたjob_fileを/tmpに作成する job_id = args[:job_id] job = Job.eager_load(:job_files).find_by(id: job_id) job.job_files.each_with_index do |job_file, index| file_name_suffix = "#{index == 0 ? "" : "_#{index.to_s}"}_#{job_file.file_name}" file_path = Jobable.create_file_path(job, file_name_suffix) Jobable.file_create(job_file.data_source, file_path) puts "create_file: #{file_path}" end end end