module CensorBear class Result attr_accessor :is_mod, :mod_words, :content, :labels def initialize(content, is_mod = false, mod_words = [], labels = []) @is_mod = is_mod @mod_words = mod_words @labels = labels @content = content || '' end end class Censor QQ_REG = /(?:[加茄qQ企鹅号码\s]{1,}|[群号]{1,}|[叩叩]{1,}|[抠抠]{1,}|[扣扣]{1,})(?:[\u4e00-\u9eff]*)(?:[:,:]?)([\d\s]{6,})/ WX_REG = /(?:[加+微++➕薇?vV威卫星♥❤姓xX信]{2,}|weixin|weix)(?:[,❤️.\s]?)(?:[\u4e00-\u9eff]?)(?:[:,:]?)([\w\s]{6,})/ def initialize(content, type = 'ugc', options = {}) @is_mod = false @mod_words = [] @labels = [] @content = content @type = type.to_s @options = options @ip = options[:ip] || nil @user_id = options[:user_id] || nil end def check_text return Result.new(@content) if @content.blank? return Result.new(@content) unless CensorBear::StopWord::FIELDS.include?(@type) # 正则过滤 # if @content.match(QQ_REG) # CensorBear.info(@content, @type, 'BANNED', 'qq_regex', ip: @ip, user_id: @user_id) # raise NotPassedException # end # if @content.match(WX_REG) # CensorBear.info(@content, @type, 'BANNED', 'wx_regex', ip: @ip, user_id: @user_id) # raise NotPassedException # end # 本地自定义词库粗查 local_check # 第三方云端复查 aliyun_check unless @is_mod # 用户相关类型直接禁止创建 raise NotPassedException if @is_mod && %w[username signature dialog nickname].include?(@type) Result.new(@content, @is_mod, @mod_words.uniq, @labels.uniq) end # default type = :ugc def check_search flag = false stop_words = CensorBear::StopWord.where("#{@type} != 'IGNORE'") stop_words.each do |word| finder = Regexp.new(Regexp.escape(word.key)) action = word.send(@type.to_sym).upcase if finder.match(@content) flag = true CensorBear.info(@content, @type, action, 'check_search', mod_words: [word.key], ip: @ip, user_id: @user_id) break end end flag end def local_check original_content = @content.dup stop_words = CensorBear::StopWord.where("#{@type} != 'IGNORE'") stop_words.each do |word| finder = Regexp.new(Regexp.escape(word.key)) action = word.send(@type.to_sym).upcase next unless finder.match(@content) @mod_words.push(word.key) case action when 'REPLACE' replacement = word.replacement.blank? ? '**' : word.replacement @content = @content.gsub(finder, replacement) CensorBear.info( original_content, @type, action, 'local_check', filtered_content: @content, mod_words: [word.key], ip: @ip, user_id: @user_id ) when 'MOD' @is_mod = true CensorBear.info( original_content, @type, action, 'local_check', filtered_content: nil, mod_words: [word.key], ip: @ip, user_id: @user_id ) when 'BANNED' # 禁止的直接抛出异常 CensorBear.info( original_content, @type, action, 'local_check', filtered_content: nil, mod_words: [word.key], ip: @ip, user_id: @user_id ) raise NotPassedException end end end def aliyun_check response = AliyunGreen::Text.scan(@content) d = response['data'].first r = d['results'].first action = r['suggestion'] rate = r['rate'] @mod_words = concat_words(r['details']) @labels = concat_labels(r['details']) if action == 'block' && rate >= 70 && %w[politics terrorism].include?(r['label']) CensorBear.info( d['content'], @type, 'banned', 'aliyun_check', filtered_content: d['filteredContent'], mod_words: @mod_words, labels: @labels, ip: @ip, user_id: @user_id, response: response ) raise NotPassedException elsif action == 'block' && rate >= 90 @is_mod = true CensorBear.info( d['content'], @type, 'mod', 'aliyun_check', filtered_content: d['filteredContent'], mod_words: @mod_words, labels: @labels, ip: @ip, user_id: @user_id, response: response ) elsif action == 'review' @is_mod = true CensorBear.info( d['content'], @type, 'mod', 'aliyun_check', filtered_content: d['filteredContent'], mod_words: @mod_words, labels: @labels, ip: @ip, user_id: @user_id, response: response ) else CensorBear.info( d['content'], @type, 'ignore', 'aliyun_check', filtered_content: d['filteredContent'], mod_words: @mod_words, labels: @labels, ip: @ip, user_id: @user_id, response: response ) end end private def concat_labels(data) return [] if data.blank? labels = [] data.map do |d| next if d['label'].blank? labels.push(d['label']) end labels end def concat_words(data) return [] if data.blank? words = [] data.each do |d| next if d['contexts'].blank? d['contexts'].each do |c| words.push(c['context']) end end words end def tencent_check; end end end