require 'fluent/plugin/in_http' require 'fluent/plugin/geoip' require "fluent/plugin/viki/version" module Fluent class VikiInput < HttpInput include Geoip CROSSDOMAIN_XML = < EOF Plugin.register_input('viki', self) config_param :respond_with_empty_img, :bool, :default => 'true' config_param :default_tag, :default => '' def on_request(path_info, params) return_hash = nil begin tag = path_info[1..-1].split('/').join('.') return return_200_crossdomain unless tag.downcase.match("crossdomain.xml").nil? tag = @default_tag if tag == '' && @default_tag != '' record = params.dup time = if record['t_f'] t_f = record['t_f'] record.delete('t_f') t_f.to_i else Engine.now end record['t'] = time.to_s unless record['t'] set_record_ip(params, record) set_record_domain(record) record['uuid'] = record.delete('viki_uuid') if record['viki_uuid'] record['content_provider'] = record.delete('type') if record['type'] record['device_id'] = record.delete('dev_model') if record['dev_model'] record.each { |k, v| record[k] = '' if v == 'null' } # generate a unique event id for each event unless record['mid'] record['mid'] = params['HTTP_X_REQUEST_ID'] || gen_message_id(time) end # rename video_view to minute_view record['event'] = 'minute_view' if record['event'] == 'video_view' # rename bottom_subtitle to subtitle_lang, add subtitle_enabled if %w(video_play minute_view).include?(record['event']) update_subtitles(record) end # fix xunlei data sending timestamps if record['app_id'] == '100105a' record.delete_if {|key, _| !!(key =~ /\A[0-9]+{13}\z/) } end rescue return return_400 end begin Engine.emit(tag, time, record) rescue return return_505 end return_200(return_hash) end def update_subtitles(record) record['subtitle_lang'] = record.delete('bottom_subtitle') if record['bottom_subtitle'] and record['subtitle_lang'].nil? if record['subtitle_visible'].nil? record['subtitle_visible'] = record['subtitle_lang'] && record['subtitle_lang'].size > 0 end # manual subtitle set to Chinese for xunlei and letv if %w(100106a 100105a).include?(record['app_id']) record['subtitle_enabled'] = true record['subtitle_lang'] = 'zh' end end def remove_exceptions_from_hash(attr_hash, app_id) app_hash = { '100004a' => 'ios', '100005a' => 'android', '65535a' => 'flash', '100017a' => 'android', '100016a' => 'mweb', '100018a' => 'xbox' } source = app_hash[app_id] if source.nil? puts "Warning: Could not identify source with app_id: "+app_id return nil end attr_hash.each { |k, v| if v.any? && v.include?(source) attr_hash.delete(k) end } attr_hash.keys end def set_record_domain(record) site = record['site'] record['domain'] = site.gsub(/^https?:\/\//, '').gsub(/([^\/]*)(.*)/, '\1').gsub(/^www\./, '') if site end def set_record_ip(params, record) record['ip'] ||= params['HTTP_X_FORWARDED_FOR'] || params['REMOTE_ADDR'] ips = unless record['ip'].nil? if record['ip'].kind_of? String record['ip'].gsub(' ', ',').split(',') elsif record['ip'].kind_of? Array record['ip'] .map {|e| e.split(",").map(&:strip) } .inject([]) {|accum, e| accum + e} end end record['country'] = record.delete('country_code') if record['country_code'] valid_ip, geo_country = resolve_correct_ip(ips) record['ip_raw'], record['ip'] = record['ip'], valid_ip record['country'] = geo_country || record['country'] record.merge! city_of_ip(valid_ip) end def resolve_correct_ip(ips) ips.each do |ip| geo_country = country_code_of_ip(ip) return [ip, geo_country] unless geo_country.nil? end unless ips.nil? [ips.first, nil] end def return_505 ["500 Internal Server Error", { 'Content-type' => 'text/plain' }, "500 Internal Server Error\n#{$!}\n"] end def return_400 ["400 Bad Request", { 'Content-type' => 'text/plain' }, "400 Bad Request\n#{$!}\n"] end def return_200 json_hash if json_hash ["200 OK", { 'Content-type' => 'application/json' }, json_hash.to_json] else if @respond_with_empty_img ["200 OK", { 'Content-type' => 'image/gif', 'Access-Control-Allow-Origin' => '*' }, "GIF89a\u0001\u0000\u0001\u0000\x80\xFF\u0000\xFF\xFF\xFF\u0000\u0000\u0000,\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000\u0000\u0002\u0002D\u0001\u0000;"] else ["200 OK", { 'Content-type' => 'text/plain', 'Access-Control-Allow-Origin' => '*' }, ""] end end end def return_200_crossdomain ["200 OK", { 'Content-type' => 'text/xml', 'Access-Control-Allow-Origin' => '*', 'Cache-Control' => 'public; max-age=36000' }, CROSSDOMAIN_XML] end # Generate a unique id for the event, length: 10+1+5 = 16 # It's relatively sortable def gen_message_id time r = rand(36**5).to_s(36) "#{time.to_s}-#{r}" end end end