# -*- encoding: utf-8 -*- require 'rubygems' require 'time' require 'date' require 'fileutils' require 'right_aws' require 'rake' require 'yaml' require 'logbox' module ObservationCompiler class Job def initialize(options = {}) @raw_logs_bucket = options[:raw_logs_bucket] || "rwdata-logs" @raw_logs_prefix = options[:raw_logs_prefix] || "observer-log-" @processed_logs_path = options[:processed_logs_path] || "local_files" temp_dir = File.exist?("/apps/smartass") ? "/apps/smartass/tmp/" : "/tmp/" @working_path = options[:working_path] ||= "#{temp_dir}observation_compiler/#{Process.pid}" end def fetch_and_merge(raw_date_range) # A raw log-file for a date may contain observations from the day before. processed_date_range = (raw_date_range.first-1)..(raw_date_range.last) create_working_folders copy_processed_logs_to_working_folder(processed_date_range) unzip_processed(processed_date_range) raw_date_range.each do |date| download_raw_logs_to_working_folder(date) unzip_raw(date) merge_raw_into_processed(date) remove_raw_logs(date) end ensure sort_processed(processed_date_range) zip_processed(processed_date_range) move_processed_back(processed_date_range) remove_working_path end def create_working_folders FileUtils.mkdir_p(raw_working_path) FileUtils.mkdir_p(processed_working_path) end def raw_working_path File.join(@working_path, "r") end def processed_working_path File.join(@working_path, "p") end def processed_log_name(date) "observer-log-#{date.strftime('%Y-%m-%d')}" end def raw_logs_prefix(date) @raw_logs_prefix + date.strftime('%Y-%m-%d') end def raw_log_paths(date) Dir.glob(File.join(raw_working_path, "#{raw_logs_prefix(date)}*")).sort end def remove_working_path FileUtils.rm_r @working_path end def copy_processed_logs_to_working_folder(date_range) date_range.each do |date| name = processed_log_name(date) + ".gz" source = File.join(@processed_logs_path, name) destination = File.join(processed_working_path, name) if File.exist? source log "Copying #{name} to working folder" FileUtils.copy(source, destination) end end end def download_raw_logs_to_working_folder(date) s3 = RightAws::S3.new(*aws_keys) bucket = s3.bucket(@raw_logs_bucket) raise "Unknown bucket: #{@raw_logs_bucket}" if bucket.nil? raw_logs = bucket.keys(:prefix => raw_logs_prefix(date)) raw_logs.each do |raw_log| log "Getting #{raw_log.name}" File.open(File.join(raw_working_path, raw_log.name), "w") do |file| s3.interface.get(@raw_logs_bucket, raw_log.name) do |chunk| file.write(chunk) end end end end def unzip_raw(date) log "Unzipping raw logs for #{date}" raw_log_paths(date).each do |raw_log| system "gunzip #{raw_log}" if raw_log.end_with?(".gz") end end def merge_raw_into_processed(date) start_time = Time.now count = 0 out_files = {} raw_log_paths(date).each do |raw_log| if raw_log_already_processed?(raw_log) log "Skipping #{raw_log}" next else log "Processing #{raw_log}" end File.foreach raw_log do |line| log_line = LogLine.new(line) next unless log_line.valid? date = log_line.date name = File.join(processed_working_path, processed_log_name(date)) out_files[name] ||= File.open(name, "a") out_files[name] << log_line.normalize count += 1 end end ensure out_files.each_value { |file| file.close } log "#{count} rader på #{(Time.now - start_time).to_f}s (#{count/(Time.now - start_time).to_f} rader/s)" end def raw_log_already_processed?(log_file_name) # Look for the last observation to see if it is already processed. last_observation = `tail -n 1 #{log_file_name}` log_line = LogLine.new(last_observation) return false unless log_line.valid? date = log_line.date processed_file_name = File.join(processed_working_path, processed_log_name(date)) File.exist?(processed_file_name) && system("grep", "-qF", last_observation, processed_file_name.chomp) end def remove_raw_logs(date) log "Removing raw logs for #{date}" raw_log_paths(date).each do |raw_log| FileUtils.rm(raw_log) end end def sort_processed(date_range) date_range.each do |date| name = processed_log_name(date) Dir.chdir processed_working_path do next unless File.exist?(name) log "Sorting #{name}" ENV['LC_ALL'] = 'C' ok = system "sort -t: -k2,4 #{name} | uniq > #{name}.sorted" raise "Sort error!" unless ok File.rename("#{name}.sorted", name) end end end def zip_processed(date_range) log "Zipping processed files" date_range.each do |date| name = processed_log_name(date) file = File.join(processed_working_path, name) system "gzip #{file}" if File.exist? file end end def unzip_processed(date_range) log "Unzipping processed files" date_range.each do |date| name = processed_log_name(date) + ".gz" file = File.join(processed_working_path, name) system "gunzip #{file}" if File.exist? file end end def move_processed_back(date_range) date_range.each do |date| name = processed_log_name(date) + ".gz" source = File.join(processed_working_path, name) destination = File.join(@processed_logs_path, name) if File.exist? source log "Moving #{name} back" FileUtils.move(source, destination) end end end def log msg unless defined?(TEST_RUN) puts msg end end DEFAULT_KEY_FILE = '/etc/s3_key.yml' def aws_keys if File.exists? DEFAULT_KEY_FILE hash = YAML.load_file(DEFAULT_KEY_FILE) [hash[:access_key_id], hash[:secret_access_key]] else access_key_id = ENV['OBSENTER_S3_KEY'] secret_access_key = secret_access_key_from_keychain!(access_key_id) [access_key_id, secret_access_key] end end # These two methods are borrowed from Awsborn def secret_access_key_from_keychain! (key_id) secret = secret_access_key_from_keychain key_id raise "Could not find secret access key for #{key_id}" if secret.to_s == '' secret end def secret_access_key_from_keychain (key_id) @credentials ||= {} unless @credentials[key_id] dump = `security -q find-generic-password -a "#{key_id}" -g 2>&1` secret_key = dump[/password: "(.*)"/, 1] @credentials[key_id] = secret_key end @credentials[key_id] end end class LogLine def initialize(line) @line = Logbox::StringEncoder.iconv(line) end def valid? normalize true rescue false end def normalize normalize_s3_format normalize_apache_format normalize_timestamp @line end TIMESTAMP_MATCHER = /(\d+)\/(\w+)\/(\d+):(\d+):(\d+):(\d+)\s([-+]?\d{2})/ def timestamp unless @timestamp match = @line.match(TIMESTAMP_MATCHER) @timestamp = Time.utc(match[3], match[2], match[1], match[4], match[5], match[6]) @timestamp -= match[7].to_i * 3600 # Correct the zone. Works only on whole hours timezones. end @timestamp end def date timestamp.send :to_date end def to_s @line end private S = '(\\S+)' # Equivalent to /("(?:\\\\|\\"|[^\\"])*")/ Q = '("(?:\\\\\\\\|\\\\"|[^\\\\"])*")' TIMESTAMP = '(\\[[^\\]]+\\])' S3_FORMAT = Regexp.new('^' + [S,S,TIMESTAMP,S,S,S,S,S,Q,S,S,S,S,S,S,Q,Q,S].join(' '), 'm') def normalize_s3_format # %Q{owner bucket [16/Mar/2010:16:00:00 +0000] 85.225.221.221 requester requestID operation key "GET /log.gif?_item_id=987&_title=V%C3%A4skor%2FFodral&_url=http%3A%2F%2Fwww.24.se%2Fvaskorfodral-c-987-1.aspx%3Fsortorder%3D1%26direction%3D0%26defps%3D10%26pagesize%3D30%26pagenum%3D3%26useparams%3D0&a=Mozilla%2F5.0%20(Macintosh%3B%20U%3B%20Intel%20Mac%20OS%20X%2010.6%3B%20sv-SE%3B%20rv%3A1.9.2)%20Gecko%2F20100115%20Firefox%2F3.6&aid=jetshop&l=sv-se&n=netscape&o=view_tag&p=macintel&r=http%3A%2F%2Fwww.24.se%2Fvaskorfodral-c-987-1.aspx%3Fsortorder%3D1%26direction%3D0%26defps%3D10%26pagesize%3D30%26pagenum%3D2%26useparams%3D0&s=1280x800&sid=www.24.se&t=V%C3%A4skor%2FFodral&u=http%3A%2F%2Fwww.24.se%2Fvaskorfodral-c-987-1.aspx%3Fsortorder%3D1%26direction%3D0%26defps%3D10%26pagesize%3D30%26pagenum%3D3%26useparams%3D0&uid=1256057859704610385&x=32058&z=-60& HTTP/1.1" 200 - 35 35 6 5 "http://www.24.se/vaskorfodral-c-987-1.aspx?sortorder=1&direction=0&defps=10&pagesize=30&pagenum=3&useparams=0" "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.6; sv-SE; rv:1.9.2) Gecko/20100115 Firefox/3.6" -} match = @line.match(S3_FORMAT) if match @line = %Q(#{match[4]} - - #{match[3]} #{match[9]} #{match[10]} #{match[12]} #{match[16]} #{match[17]} "-" "-"\n) end @line end APACHE_WITHOUT_COOKIES = Regexp.new('^' + [S,S,S,TIMESTAMP,Q,S,S,Q,Q].join(' ') + '$') def normalize_apache_format # Add third party cookies at end if they are not there. # %Q{124.191.88.9 - - [26/May/2009:23:59:50 +0000] "GET /log.gif" "Mozilla/5.0"} match = @line.match(APACHE_WITHOUT_COOKIES) if match @line = %Q(#{match[0]} "-" "-"\n) end @line end def normalize_timestamp # 12/Apr/2010:09:07:23 +0200 => 12/Apr/2010:07:07:23 +0000 match = @line.match(/^(.*?\[)([^\]]+)(\].+)$/m) unless match[2].end_with?('0000') @line = "#{match[1]}#{timestamp.strftime('%d/%b/%Y:%H:%M:%S +0000')}#{match[3]}" end @line end end end