module GiJob module Jobs module Concerns module JobRunner extend ActiveSupport::Concern included do # begin included end # end included class_methods do # begin class_methods end # end class_methods ##################### job実行系 def perform(args) set_signal_trap perform_impl(args) end def perform_impl(**args) _perform_default(args) end def _perform_default(**args) @_gi_job_transaction = args[:gi_job_transaction] # GiJob.logger.ap({_perform_default: {@_gi_job_transaction: @_gi_job_transaction}}) @_process_info = { error: @_gi_job_transaction.counts_errord != nil && 0 < @_gi_job_transaction.counts_errord, counts: { all: @_gi_job_transaction.counts_all || 0, progress: @_gi_job_transaction.counts_progress || 0, completed: @_gi_job_transaction.counts_completed || 0, errord: @_gi_job_transaction.counts_errord || 0, } } # GiJob.logger.ap({_perform_default: {@_process_info: @_process_info}}) begin run_job do _suspendable = self.class.respond_to?(:GI_JOB_SUSPENDABLE) && self.class::GI_JOB_SUSPENDABLE process_status_transaction(suspendable: _suspendable) do run_job_impl( gi_job_transaction: @_gi_job_transaction, parameter: @_gi_job_transaction.parameter ) end end rescue => e JobUtils.logging_fatal(e) @_gi_job_transaction.status_error! @_gi_job_transaction.append_log( level: :error, division: "#{e.class.name}", description: "処理中に内部エラーが発生しました。" ) end end def run_job # GiJob.logger.ap({run_job: {args: args}}) time_start = Time.zone.now result = {} begin @_gi_job_transaction.status_running! result = yield(@_gi_job_transaction) rescue => e GiJob.logger.fatal(e.message) GiJob.logger.fatal(e.backtrace.join("\n")) @_process_info[:error] = true @_gi_job_transaction.status_error! counts_progress = @_process_info[:counts][:progress] message = "#{counts_progress}件目の処理で内部エラーが発生しました。" @_gi_job_transaction.append_log( level: :error, division: "#{e.class.name}", description: message, ) end GiJob.logger.ap({JOB: "done #{self.class.name}"}) if @_gi_job_transaction.process_status_suspended? || @_gi_job_transaction.status_ended? # 以下の場合はステータスを変更しない # - 途中でsuspendedが発生している場合 # - 途中で終了ステータスが設定されている場合 else if @_process_info[:error] @_gi_job_transaction.status_error! else @_gi_job_transaction.status_completed! end end syms = @_process_info[:counts].keys update_counts_by_process_info(syms) _slack_notifire = self.class.respond_to?(:GI_JOB_SLACK_NOTIFIER) && self.class::GI_JOB_SLACK_NOTIFIER if _slack_notifire && @_gi_job_transaction.parameter[:_dont_notify].blank? # 通知 notifier_slack(gi_job_transaction: @_gi_job_transaction, time_start: time_start, result: result) end # 後処理 post_process_result = self.class.job_process_call(process_name: "後処理", gi_job_transaction: @_gi_job_transaction) do |**args| self.class.job_post_process_impl(args) end end def run_job_impl(gi_job_transaction:, parameter:) # 派生クラスで拡張すること GiJob.logger.warning("gi_job_transaction.id: #{gi_job_transaction.id}, ...not implemented run_job_imple in #{self.class.name}") end ################### counts系 def increment_counts(sym = :progress, threshold: 1) @_process_info[:counts][sym] += 1 record_sym = "counts_#{sym.to_s}".to_sym # GiJob.logger.ap({method: "increment_counts", threshold: threshold, record_sym: record_sym, record_value: @_gi_job_transaction[record_sym]}) if ((@_gi_job_transaction[record_sym] || 0) + (threshold || 1)) <= @_process_info[:counts][sym] # thresholdが1000の場合は、1000件ごとにdbを更新する # 更新するときは折角なので全部更新する syms = @_process_info[:counts].keys update_counts_by_process_info(syms) end end def update_counts(**counts_hash) # GiJob.logger.ap({method: "update_counts", counts_hash: counts_hash}) syms = [] counts_hash.each do |sym, value| @_process_info[:counts][sym] = value syms << sym end # GiJob.logger.ap({method: "update_counts", syms: syms}) update_counts_by_process_info(syms) end def update_counts_by_process_info(syms) param = {} syms.each do |sym| value = @_process_info[:counts][sym] record_sym = "counts_#{sym.to_s}".to_sym param[record_sym] = value # GiJob.logger.ap({method: "update_counts_by_process_info", sym: sym, record_sym: record_sym, value: value, param: param}) end # GiJob.logger.ap({method: "update_counts_by_process_info", param: param}) @_gi_job_transaction.update!(param) end # # def self.each_job_file_csv(job, headers: false) # job_file = job.job_files.present? ? job.job_files.first : nil # if job_file # data_source = job_file.data_source # # GiJob.logger.ap({data_source: data_source}) # # encoding = Getit::CsvUtils.create_encoding_param(data_source) # # GiJob.logger.ap({encoding: encoding}) # # csv = CSV.new(data_source, encoding: encoding, headers: headers) # row_num = 0 # csv.each do |row| # row_num += 1 # yield(row, row_num) # end # end # end # # def increment_progress(job, need_update = true) # # 例外時に補足して件数情報をlogに出すために、インスタンスで処理件数を持つ # if @tmp_counts_progress.nil? # @tmp_counts_progress = job.counts_progress || 0 # end # @tmp_counts_progress += 1 # # file_upload時は、最後に件数更新を行うため、need_update = falseを想定 # (job, @tmp_counts_progress) if need_update # end # # def update_counts_progress(job, num) # job.update!({counts_progress: num || 0}) # @tmp_counts_progress = num # end # # def append_error_with(job, tag, message) # job.append_error(tag, message) # @has_error = true # end # # # def self.save_job_file!(job, additional_info) # # upload_fileがある場合はDBに格納する # if additional_info.has_key?(:uploaded_file) # # 一時ファイルパスを作成 # tmp_file_path = self.create_file_path(job, additional_info[:uploaded_file].original_filename) # # # 2回読み込むため、一時ファイルとして一旦保存する # file_copy(additional_info[:uploaded_file], tmp_file_path) # # # レコードへのファイル保存 # encoding = Getit::CsvUtils.create_encoding_param(File.new(tmp_file_path)) # mode = "rt:#{encoding}" # # GiJob.logger.ap({mode: mode}) # tmp_file = File.open(tmp_file_path, mode = mode).read # job_file = JobFile.new({ # job: job, # file: tmp_file, # file_name: additional_info[:uploaded_file].original_filename, # counts_row: tmp_file.count("\n"), # }) # job.update!({ # job_files: [job_file], # }) # # # jobに対してActionDispatch::Fileみたいなのが渡せないため削除する # additional_info.delete(:uploaded_file) # # # 古い一時ファイルを削除する # self.delete_old_files # end # end # # def self.relation_master_schedule_job!(job, master_schedule_job) # if master_schedule_job.present? && job.respond_to?(:master_schedule_job) # # master_schedule_jobを紐づけ # job.master_schedule_job = master_schedule_job # # if master_schedule_job.additional_info.present? # # master_schedule_jobに設定されているadditional_infoを引き継ぎ # master_schedule_job_additional_info = master_schedule_job.additional_info # if master_schedule_job.additional_info[:next_one].present? # next_one = master_schedule_job.additional_info[:next_one].delete # master_schedule_job.save! # master_schedule_job_additional_info.merge!(next_one) # end # job.additional_info.merge!(master_schedule_job.additional_info) # job.save! # end # # master_schedule_job.last_job = job # master_schedule_job.save! # end # # job.save! # end # # def self.create_file_path(job, file_name_suffix = ".csv") # file_name_prefix = "#{job.job_type}_" || "" # "#{self.tmp_dir}/job_file_#{file_name_prefix}#{job.id}#{file_name_suffix}" # end # # def self.delete_old_files(time_limit = 14.days) # # 2週間前の一時ファイルを削除する # tmp_dir = self.tmp_dir # time_now = Time.now # threshold = time_now - time_limit # GiJob.logger.ap("delete_old_file #{time_now} - #{time_limit} = threshold: #{threshold}") # deleted_paths = [] # Dir.glob("#{tmp_dir}/job_file_*").select do |file_path| # File.mtime(file_path) < threshold # end.each do |file_path| # GiJob.logger.ap("delete_old_file: #{file_path}") # FileUtils.rm(file_path) # deleted_paths << file_path # end # deleted_paths # end # # def self.tmp_dir # tmp_dir = "#{Dir.tmpdir}/job" # unless File.exist?(tmp_dir) # FileUtils.mkdir_p(tmp_dir) # end # tmp_dir # end # # def self.file_copy(file, dst) # GiJob.logger.ap({file: file, dst: dst}) # FileUtils.cp(file.path, dst) # # GiJob.logger.ap({rrr: file.read}) # # File.open(dst, 'w') do |fp| # # fp.write(file.read) # # end # end # # def self.file_create(content, dst) # File.open(dst, 'w') do |fp| # fp.write(content) # end # end # # #### slack通知系 # def notifier_slack(job:, time_start:, time_end: Time.zone.now, job_rc: {}) # timestamp_hash = create_timestamp_hash( # time_start: time_start, # time_end: time_end, # counts_progress: job.counts_progress # ) # attachments = [] # attachments << create_slack_common_attachment(job, timestamp_hash) # slack_notifier_info = notifier_slack_imple( # attachments: attachments, # job: job, # timestamp_hash: timestamp_hash, # job_rc: job_rc # ) # # if slack_notifier_info.instance_of?(Array) # slack_notifier_info.each do |info| # slack_api_wrapper = SlackApiWrapper.new(info[:initialize_params] || {}) # slack_api_wrapper.notifier(info[:notifier_params]) # end # else # slack_api_wrapper = SlackApiWrapper.new # slack_api_wrapper.notifier(slack_notifier_info) # end # end # # def create_slack_common_attachment(job, timestamp_hash) # job_info_string = "#{job.shop.shopify_domain} (#{job.shop.id}) #{job.enum_localize(:job_type)} (#{job.id})" # GiJob.logger.info("[JOB-INFO] #{job_info_string} End #{timestamp_hash[:time_description]} #{timestamp_hash[:processing_count_per_second]}") # # main_color = job.error? ? "danger" : "good" # text_string = "処理件数: #{job.counts_progress} / #{job.counts_all}" # text_string += "\n#{timestamp_hash[:timestamp_text]}" # # if job.process_status_suspended? # text_string += "\n一時停止: #{job.process_progress} 件目" # elsif job.process_status_terminated? # text_string += "\n途中終了: #{job.process_progress} 件目" # end # # job_logs_hash = job.create_job_logs_hash # if 0 < job_logs_hash[:error].size # main_color = "warning" # text_string += "\nエラー件数: #{job_logs_hash[:error].size}" # text_string += "\n先頭エラーメッセージ: #{job_logs_hash[:error].first.description}" # end # # attachment = { # title: "#{job_info_string}", # color: main_color, # text: "#{text_string}", # } # attachment # end # # def notifier_slack_imple( # attachments: [], # job:, # timestamp_hash: {}, # job_rc: {} # ) # # 必要であれば派生クラスで拡張 # create_simple_slack_notifer( # attachments: attachments, # job: job, # timestamp_hash: timestamp_hash, # job_rc: job_rc # ) # end # # def create_timestamp_hash(time_start:, time_end: Time.zone.now, counts_progress: nil) # time_format = '%Y/%m/%d %H:%M:%S' # time_elapsed = (time_end - time_start).round(2) # 経過時間 # time_description = "#{time_start.strftime(time_format)} ~ #{time_end.strftime(time_format)} (#{time_elapsed}秒)" # 開始終了(経過時間) # processing_count_per_second = "#{(counts_progress && 0 < counts_progress) ? (counts_progress / time_elapsed).round(2) : "-"} 件/秒" # 処理件数(秒間処理件数) # timestamp_text = "実行時間: #{time_description}\n処理速度: #{processing_count_per_second}" # # { # time_start: time_start, # time_end: time_end, # time_elapsed: time_elapsed, # counts_progress: counts_progress, # time_description: time_description, # processing_count_per_second: processing_count_per_second, # timestamp_text: timestamp_text, # } # end # # def create_simple_slack_notifer( # attachments: [], # job:, # timestamp_hash: {}, # job_rc: {} # ) # counts_hash = job_rc.is_a?(Hash) ? job_rc[:counts_hash] : {} # GiJob.logger.ap({counts_hash: counts_hash}) # text = "#{job.to_simple[:local][:job_type]} - #{timestamp_hash[:time_start].strftime('%-m月%-d日 %-H:%M')}" # # job_counts_text = nil # color = nil # if counts_hash.present? # job_counts_text = "処理件数: #{counts_hash[:counts]}件" # color = "good" # else # job_counts_text = "ジョブ結果が正常に取得できませんでした" # color = "warning" # end # # info_links = job.job_log_files.map {|job_log_file| job_log_file.slack_link} # # tmp_attachments = [{ # title: "処理詳細", # color: color, # text: "#{job_counts_text}", # }, { # title: "ログファイル", # color: "good", # text: "#{info_links.join("\n")}", # }] # # if counts_hash.present? && counts_hash[:errors].present? # tmp_attachments << { # title: "エラー", # color: "danger", # text: "#{counts_hash[:errors].join("\n")}", # } # end # # attachments |= tmp_attachments # {text: text, attachments: attachments} # end # ##################### シグナル捕捉 & suspended def set_signal_trap @_is_signal_trapped = false Signal.trap('TERM') do # シグナル発生 GiJob.logger.info('trap TERM!') @_is_signal_trapped = true end Signal.trap('TSTP') do # シグナル発生 GiJob.logger.info('trap TSTP!') @_is_signal_trapped = true end end def process_status_transaction( gi_job_transaction: @_gi_job_transaction, suspendable: true ) # suspendable = true の場合、suspended する # suspendable = false の場合、terminated する begin yield rescue JobCommandSuspended => e # GiJob.logger.ap({rescue: e}) message = e.message info_string = "#{message} のため #{@_process_info[:counts][:progress]} 件目の処理中に中断しました" process_status = :terminated if suspendable info_string = "#{message} のため #{@_process_info[:counts][:progress]} 件目の処理中に一時停止しました" process_status = :suspended end gi_job_transaction.append_log_info(division: "job停止", description: "#{info_string}") gi_job_transaction.update!({ process_status: process_status, process_progress: e.process_progress }) end end def check_job_command! if @_is_signal_trapped # ジョブ停止要求あり raise JobCommandSuspended.new(@_process_progress || nil), "サーバー停止要求" end end def with_job_command(process_progress_key: nil) if process_progress_key.present? @_process_progress = {key: process_progress_key} end check_job_command! # skip判断 if @_gi_job_transaction.process_progress.present? && @_process_progress.present? # 途中まで処理されている場合は、そこまで skip する suspended_key = @_gi_job_transaction.process_progress.try(:key, nil) now_key = @_process_progress.try(:key, nil) if process_progress_key == suspended_key # 途中まで処理されていたkeyと一致した場合は再開 # process_progress を削除しておく @_gi_job_transaction.update!(process_progress: nil) else # 処理済みのため skip GiJob.logger.ap("実施済みの為スキップします process_progress_key: #{process_progress_key} != suspended_key: #{suspended_progress_key}") return nil end end yield end def sleep_as_check_job_command(seconds, &block) sleep_seconds = seconds sleep_loop_max_counts = 1 if 1 < seconds sleep_seconds = 1 sleep_loop_max_counts = seconds.ceil end sleep_loop_max_counts.times do |index| GiJob.logger.ap({method: "sleep_as_check_job_command", loops: "#{index}/#{sleep_loop_max_counts}"}) # チェックしながら小分けにして休む check_job_command! sleep(sleep_seconds) if block_given? # ブロックが渡されている場合は評価して、必要に応じて抜ける rc = yield break if rc.present? end end end end # end JobRunner end end end