require 'td/helpers' module TreasureData module Command HIVE_RESERVED_KEYWORDS = %W[ TRUE FALSE ALL AND OR NOT LIKE ASC DESC ORDER BY GROUP WHERE FROM AS SELECT DISTINCT INSERT OVERWRITE OUTER JOIN LEFT RIGHT FULL ON PARTITION PARTITIONS TABLE TABLES TBLPROPERTIES SHOW MSCK DIRECTORY LOCAL TRANSFORM USING CLUSTER DISTRIBUTE SORT UNION LOAD DATA INPATH IS NULL CREATE EXTERNAL ALTER DESCRIBE DROP REANME TO COMMENT BOOLEAN TINYINT SMALLINT INT BIGINT FLOAT DOUBLE DATE DATETIME TIMESTAMP STRING BINARY ARRAY MAP REDUCE PARTITIONED CLUSTERED SORTED INTO BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED COLLECTION ITEMS KEYS LINES STORED SEQUENCEFILE TEXTFILE INPUTFORMAT OUTPUTFORMAT LOCATION TABLESAMPLE BUCKET OUT OF CAST ADD REPLACE COLUMNS RLIKE REGEXP TEMPORARY FUNCTION EXPLAIN EXTENDED SERDE WITH SERDEPROPERTIES LIMIT SET TBLPROPERTIES ] KEY_NUM_LIMIT = 512 def table_create(op) type = nil primary_key = nil primary_key_type = nil op.on('-T', '--type TYPE', 'set table type (log or item)') {|s| unless ['item', 'log'].include?(s) raise "Unknown table type #{s.dump}. Supported types: log and item" end type = s.to_sym } op.on('--primary-key PRIMARY_KEY_AND_TYPE', '[primary key]:[primary key type(int or string)]') {|s| unless /\A[\w]+:(string|int)\z/ =~ s $stderr.puts "--primary-key PRIMARY_KEY_AND_TYPE is required, and should be in the format [primary key]:[primary key type]" exit 1 end args = s.split(':') if args.length != 2 # this really shouldn't happen with the above regex exit 1 end primary_key = args[0] primary_key_type = args[1] } db_name, table_name = op.cmd_parse API.validate_table_name(table_name) if HIVE_RESERVED_KEYWORDS.include?(table_name.upcase) $stderr.puts "* WARNING *" $stderr.puts " '#{table_name}' is a reserved keyword in Hive. We recommend renaming the table." $stderr.puts " For a list of all reserved keywords, see our FAQ: http://docs.treasure-data.com/articles/faq" end if type == :item && (primary_key.nil? || primary_key_type.nil?) $stderr.puts "for TYPE 'item', the primary-key is required" exit 1 end client = get_client begin if type == :item client.create_item_table(db_name, table_name, primary_key, primary_key_type) else client.create_log_table(db_name, table_name) end rescue NotFoundError cmd_debug_error $! $stderr.puts "Database '#{db_name}' does not exist." $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "db:create #{db_name}' to create the database." exit 1 rescue AlreadyExistsError cmd_debug_error $! $stderr.puts "Table '#{db_name}.#{table_name}' already exists." exit 1 end $stderr.puts "Table '#{db_name}.#{table_name}' is created." end def table_delete(op) force = false op.on('-f', '--force', 'never prompt', TrueClass) {|b| force = true } db_name, table_name = op.cmd_parse client = get_client begin unless force table = get_table(client, db_name, table_name) $stderr.print "Do you really delete '#{table_name}' in '#{db_name}'? [y/N]: " ok = nil while line = $stdin.gets line.strip! if line =~ /^y(?:es)?$/i ok = true break elsif line.empty? || line =~ /^n(?:o)?$/i break else $stderr.print "Type 'Y' or 'N': " end end unless ok $stderr.puts "canceled." exit 1 end end client.delete_table(db_name, table_name) rescue NotFoundError cmd_debug_error $! $stderr.puts "Table '#{db_name}.#{table_name}' does not exist." $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "table:list #{db_name}' to show list of the tables." exit 1 end $stderr.puts "Table '#{db_name}.#{table_name}' is deleted." end def table_list(op) require 'parallel' format = 'table' num_threads = 4 show_size_in_bytes = false op.on('-n', '--num_threads VAL', 'number of threads to get list in parallel') { |i| num_threads = Integer(i) } op.on('--show-bytes', 'show estimated table size in bytes') { show_size_in_bytes = true } set_render_format_option(op) db_name = op.cmd_parse client = get_client if db_name database = get_database(client, db_name) databases = [database] else databases = client.databases end has_item = databases.select {|db| db.tables.select {|table| table.type == :item}.length > 0 }.length > 0 rows = [] ::Parallel.each(databases, :in_threads => num_threads) {|db| begin db.tables.each {} db.tables.each {|table| pschema = table.schema.fields.map {|f| "#{f.name}:#{f.type}" }.join(', ') new_row = { :Database => db.name, :Table => table.name, :Type => table.type.to_s, :Count => TreasureData::Helpers.format_with_delimiter(table.count), :Size => show_size_in_bytes ? TreasureData::Helpers.format_with_delimiter(table.estimated_storage_size) : table.estimated_storage_size_string, 'Last import' => table.last_import ? table.last_import.localtime : nil, 'Last log timestamp' => table.last_log_timestamp ? table.last_log_timestamp.localtime : nil, :Schema => pschema } if has_item and table.type == :item new_row['Primary key'] = "#{table.primary_key}:#{table.primary_key_type}" end rows << new_row } rescue APIError => e # ignores permission error because db:list shows all databases # even if the user can't access to tables in the database unless e.to_s =~ /not authorized/ raise e end end } rows = rows.sort_by {|map| [map[:Database], map[:Type].size, map[:Table]] } fields = [] if has_item fields = [:Database, :Table, :Type, :Count, :Size, 'Last import', 'Last log timestamp', 'Primary key', :Schema] else fields = [:Database, :Table, :Type, :Count, :Size, 'Last import', 'Last log timestamp', :Schema] end puts cmd_render_table(rows, :fields => fields, :max_width => 500, :render_format => op.render_format) if rows.empty? if db_name $stderr.puts "Database '#{db_name}' has no tables." $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "table:create ' to create a table." elsif databases.empty? $stderr.puts "There are no databases." $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "db:create ' to create a database." else $stderr.puts "There are no tables." $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "table:create
' to create a table." end end end def table_swap(op) db_name, table_name1, table_name2 = op.cmd_parse client = get_client table1 = get_table(client, db_name, table_name1) table2 = get_table(client, db_name, table_name2) client.swap_table(db_name, table_name1, table_name2) $stderr.puts "'#{db_name}.#{table_name1}' and '#{db_name}.#{table_name2}' are swapped." end def table_show(op) db_name, table_name = op.cmd_parse client = get_client table = get_table(client, db_name, table_name) puts "Name : #{table.db_name}.#{table.name}" puts "Type : #{table.type}" puts "Count : #{table.count}" # p table.methods.each {|m| puts m} puts "Primary key : #{table.primary_key}:#{table.primary_key_type}" if table.type == :item puts "Schema : (" table.schema.fields.each {|f| puts " #{f.name}:#{f.type}" } puts ")" end def table_tail(op) from = nil to = nil count = 10 pretty = nil op.on('-t', '--to TIME', 'end time of logs to get') {|s| if s.to_i.to_s == s to = s.to_i else require 'time' to = Time.parse(s).to_i end } op.on('-f', '--from TIME', 'start time of logs to get') {|s| if s.to_i.to_s == s from = s.to_i else require 'time' from = Time.parse(s).to_i end } op.on('-n', '--count N', 'number of logs to get', Integer) {|i| count = i } op.on('-P', '--pretty', 'pretty print', TrueClass) {|b| pretty = b } db_name, table_name = op.cmd_parse client = get_client table = get_table(client, db_name, table_name) rows = table.tail(count, to, from) require 'json' if pretty opts = { :indent => ' '*2, :object_nl => "\n", :space => ' ' } rows.each {|row| puts row.to_json(opts) } else rows.each {|row| puts row.to_json } end end def table_export(op) from = nil to = nil s3_bucket = nil wait = false ## TODO #op.on('-t', '--to TIME', 'end time of logs to get') {|s| # if s.to_i.to_s == s # to = s # else # require 'time' # to = Time.parse(s).to_i # end #} #op.on('-f', '--from TIME', 'start time of logs to get') {|s| # if s.to_i.to_s == s # from = s # else # require 'time' # from = Time.parse(s).to_i # end #} op.on('-w', '--wait', 'wait until the job is completed', TrueClass) {|b| wait = b } op.on('--s3-bucket NAME', 'name of the s3 bucket to output') {|s| s3_bucket = s } db_name, table_name = op.cmd_parse unless s3_bucket $stderr.puts "--s3-bucket NAME option is required" exit 1 end client = get_client table = get_table(client, db_name, table_name) opts = {} opts['s3_bucket'] = s3_bucket opts['s3_file_format'] ='json.gz' opts['from'] = from.to_s if from opts['to'] = to.to_s if to job = table.export('s3', opts) $stderr.puts "Export job #{job.job_id} is queued." $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "job:show #{job.job_id}' to show the status." if wait && !job.finished? wait_job(job) puts "Status : #{job.status}" end end def table_partial_delete(op) from = nil to = nil wait = false op.on('-t', '--to TIME', 'end time of logs to delete in Unix time multiple of 3600 (1 hour)') {|s| if s.to_i.to_s == s # UNIX time to = s.to_i else require 'time' to = Time.parse(s).to_i end } op.on('-f', '--from TIME', 'start time of logs to delete in Unix time multiple of 3600 (1 hour)') {|s| if s.to_i.to_s == s from = s.to_i else require 'time' from = Time.parse(s).to_i end } op.on('-w', '--wait', 'wait for the job to finish', TrueClass) {|b| wait = b } db_name, table_name = op.cmd_parse unless from $stderr.puts "-f, --from TIME option is required" exit 1 end unless to $stderr.puts "-t, --to TIME option is required" exit 1 end if from % 3600 != 0 || to % 3600 != 0 $stderr.puts "Time for the -f / --from and -t / --to options must be a multiple of 3600 (1 hour)" exit 1 end client = get_client table = get_table(client, db_name, table_name) opts = {} job = client.partial_delete(db_name, table_name, to, from, opts) $stderr.puts "Partial delete job #{job.job_id} is queued." $stderr.puts "Use '#{$prog} " + Config.cl_options_string + "job:show #{job.job_id}' to show the status." if wait && !job.finished? wait_job(job) puts "Status : #{job.status}" end end def table_expire(op) db_name, table_name, expire_days = op.cmd_parse expire_days = expire_days.to_i if expire_days <= 0 $stderr.puts "Table expiration days must be greater than 0." return end client = get_client client.update_expire(db_name, table_name, expire_days) $stderr.puts "Table set to expire data older than #{expire_days} days." end IMPORT_TEMPLATES = { 'apache' => [ /^([^ ]*) [^ ]* ([^ ]*) \[([^\]]*)\] "(\S+)(?: +([^ ]*) +\S*)?" ([^ ]*) ([^ ]*)(?: "([^\"]*)" "([^\"]*)")?$/, ['host', 'user', 'time', 'method', 'path', 'code', 'size', 'referer', 'agent'], "%d/%b/%Y:%H:%M:%S %z"], 'syslog' => [ /^([^ ]* [^ ]* [^ ]*) ([^ ]*) ([a-zA-Z0-9_\/\.\-]*)(?:\[([0-9]+)\])?[^\:]*\: *(.*)$/, ['time', 'host', 'ident', 'pid', 'message'], "%b %d %H:%M:%S"], } # TODO import-item # TODO tail def table_import(op) op.banner << "\nsupported formats:\n" op.banner << " apache\n" op.banner << " syslog\n" op.banner << " msgpack\n" op.banner << " json\n" format = 'apache' time_key = 'time' auto_create = false op.on('--format FORMAT', "file format (default: #{format})") {|s| format = s } op.on('--apache', "same as --format apache; apache common log format") { format = 'apache' } op.on('--syslog', "same as --format syslog; syslog") { format = 'syslog' } op.on('--msgpack', "same as --format msgpack; msgpack stream format") { format = 'msgpack' } op.on('--json', "same as --format json; LF-separated json format") { format = 'json' } op.on('-t', '--time-key COL_NAME', "time key name for json and msgpack format (e.g. 'created_at')") {|s| time_key = s } op.on('--auto-create-table', "Create table and database if doesn't exist", TrueClass) { |b| auto_create = b } db_name, table_name, *paths = op.cmd_parse client = get_client if auto_create # Merge with db_create and table_create after refactoring API.validate_database_name(db_name) begin client.database(db_name) rescue NotFoundError begin client.create_database(db_name) $stderr.puts "Database '#{db_name}' is created." rescue AlreadyExistsError # do nothing end rescue ForbiddenError # do nothing end API.validate_table_name(table_name) begin client.create_log_table(db_name, table_name) $stderr.puts "Table '#{db_name}.#{table_name}' is created." rescue AlreadyExistsError end end case format when 'json', 'msgpack' #unless time_key # $stderr.puts "-t, --time-key COL_NAME (e.g. '-t created_at') parameter is required for #{format} format" # exit 1 #end if format == 'json' require 'json' require 'time' parser = JsonParser.new(time_key) else parser = MessagePackParser.new(time_key) end else # apache, syslog regexp, names, time_format = IMPORT_TEMPLATES[format] if !regexp || !names || !time_format $stderr.puts "Unknown format '#{format}'" exit 1 end parser = TextParser.new(names, regexp, time_format) end begin db = client.database(db_name) rescue ForbiddenError => e puts "Warning: database and table validation skipped - #{e.message}" else begin table = db.table(table_name) rescue ForbiddenError => e puts "Warning: table validation skipped - #{e.message}" end end require 'zlib' begin files = paths.map {|path| if path == '-' $stdin elsif path =~ /\.gz$/ require 'td/compat_gzip_reader' Zlib::GzipReader.open(path) else File.open(path) end } rescue Errno::ENOENT => e raise ImportError, e.message end require 'msgpack' require 'tempfile' #require 'thread' files.zip(paths).each {|file, path| import_log_file(file, path, client, db_name, table_name, parser) } puts "done." end private def import_log_file(file, path, client, db_name, table_name, parser) puts "importing #{path}..." out = Tempfile.new('td-import') out.binmode if out.respond_to?(:binmode) writer = Zlib::GzipWriter.new(out) n = 0 x = 0 has_bignum = false parser.call(file, path) {|record| entry = begin record.to_msgpack rescue RangeError has_bignum = true TreasureData::API.normalized_msgpack(record) end writer.write entry n += 1 x += 1 if n % 10000 == 0 # by records imported puts " imported #{n} entries from #{path}..." # TODO size elsif out.pos > 1024 * 1024 # by 1 MB chunks puts " imported #{n} entries from #{path}..." begin writer.finish size = out.pos out.pos = 0 puts " uploading #{size} bytes..." client.import(db_name, table_name, "msgpack.gz", out, size) out.truncate(0) out.pos = 0 x = 0 writer = Zlib::GzipWriter.new(out) rescue $stderr.puts " #{$!}" return 1 # TODO error end end } # if there is anything parse but not imported yet if x != 0 writer.finish size = out.pos out.pos = 0 puts " uploading #{size} bytes..." # TODO upload on background thread client.import(db_name, table_name, "msgpack.gz", out, size) end # throw an exception if no record is imported if n == 0 raise ImportError, "no valid record to import from #{path}" end puts " imported #{n} entries from #{path}." $stderr.puts normalized_message if has_bignum ensure out.close rescue nil writer.close rescue nil end require 'date' # DateTime#strptime require 'time' # Time#strptime, Time#parse class TextParser def initialize(names, regexp, time_format) @names = names @regexp = regexp @time_format = time_format end def call(file, path, &block) i = 0 file.each_line {|line| i += 1 begin line.rstrip! m = @regexp.match(line) unless m raise "invalid log format at #{path}:#{i}" end record = {} cap = m.captures @names.each_with_index {|name,cap_i| if value = cap[cap_i] if name == "time" value = parse_time(value).to_i end record[name] = value end } block.call(record) rescue $stderr.puts " skipped: #{$!}: #{line.dump}" end } end if Time.respond_to?(:strptime) def parse_time(value) Time.strptime(value, @time_format) end else def parse_time(value) Time.parse(DateTime.strptime(value, @time_format).to_s) end end end # Generic class for both JSON and MessagePack parsers to # reduce code duplication class StructuredParser def sanitize_record(record, &block) unless record.is_a?(Hash) raise "record must be a Hash" end time = record[@time_key] unless time raise "record doesn't have '#{@time_key}' column" end if record.size > KEY_NUM_LIMIT raise "record contains too many keys (#{record.size}, max allowed #{KEY_NUM_LIMIT})" end case time when Integer # do nothing else time = Time.parse(time.to_s).to_i end record['time'] = time block.call(record) end protected :sanitize_record end class JsonParser < StructuredParser def initialize(time_key) require 'json' @time_key = time_key end def call(file, path, &block) file.each_line {|line| begin record = JSON.parse(line) sanitize_record(record, &block) rescue $stderr.puts " skipped: #{$!}: #{record.to_json}" end } end end class MessagePackParser < StructuredParser def initialize(time_key) require 'msgpack' @time_key = time_key end def call(file, path, &block) MessagePack::Unpacker.new(file).each {|record| begin sanitize_record(record, &block) rescue $stderr.puts " skipped: #{$!}: #{record.to_json}" end } rescue EOFError end end require 'td/command/export' # table:export require 'td/command/job' # wait_job end end