module TreasureData module Command # TODO JOB_WAIT_MAX_RETRY_COUNT_ON_NETWORK_ERROR = 10 PRIORITY_FORMAT_MAP = { -2 => 'VERY LOW', -1 => 'LOW', 0 => 'NORMAL', 1 => 'HIGH', 2 => 'VERY HIGH', } PRIORITY_PARSE_MAP = { /\Avery[ _\-]?low\z/i => -2, /\A-2\z/ => -2, /\Alow\z/i => -1, /\A-1\z/ => -1, /\Anorm(?:al)?\z/i => 0, /\A[\-\+]?0\z/ => 0, /\Ahigh\z/i => 1, /\A[\+]?1\z/ => 1, /\Avery[ _\-]?high\z/i => 2, /\A[\+]?2\z/ => 2, } def job_list(op) page = 0 skip = 0 status = nil slower_than = nil op.on('-p', '--page PAGE', 'skip N pages', Integer) {|i| page = i } op.on('-s', '--skip N', 'skip N jobs', Integer) {|i| skip = i } op.on('-R', '--running', 'show only running jobs', TrueClass) {|b| status = 'running' } op.on('-S', '--success', 'show only succeeded jobs', TrueClass) {|b| status = 'success' } op.on('-E', '--error', 'show only failed jobs', TrueClass) {|b| status = 'error' } op.on('--slow [SECONDS]', 'show slow queries (default threshold: 3600 seconds)', Integer) {|i| slower_than = i || 3600 } set_render_format_option(op) max = op.cmd_parse max = (max || 20).to_i client = get_client if page skip += max * page end conditions = nil if slower_than conditions = {:slower_than => slower_than} end jobs = client.jobs(skip, skip+max-1, status, conditions) rows = [] jobs.each {|job| start = job.start_at elapsed = cmd_format_elapsed(start, job.end_at) priority = job_priority_name_of(job.priority) rows << {:JobID => job.job_id, :Database => job.db_name, :Status => job.status, :Type => job.type, :Query => job.query.to_s, :Start => (start ? start.localtime : ''), :Elapsed => elapsed, :Priority => priority, :Result => job.result_url} } puts cmd_render_table(rows, :fields => [:JobID, :Status, :Start, :Elapsed, :Priority, :Result, :Type, :Database, :Query], :max_width => 140, :render_format => op.render_format) end def job_show(op) verbose = nil wait = false output = nil format = nil render_opts = {:header => false} limit = nil exclude = false op.on('-v', '--verbose', 'show logs', TrueClass) {|b| verbose = b } op.on('-w', '--wait', 'wait for finishing the job', TrueClass) {|b| wait = b } op.on('-G', '--vertical', 'use vertical table to show results', TrueClass) {|b| render_opts[:vertical] = b } op.on('-o', '--output PATH', 'write result to the file') {|s| output = s format = 'tsv' if format.nil? } op.on('-f', '--format FORMAT', 'format of the result to write to the file (tsv, csv, json or msgpack)') {|s| unless ['tsv', 'csv', 'json', 'msgpack', 'msgpack.gz'].include?(s) raise "Unknown format #{s.dump}. Supported format: tsv, csv, json, msgpack, msgpack.gz" end format = s } op.on('-l', '--limit ROWS', 'limit the number of result rows shown when not outputting to file') {|s| unless s.to_i > 0 raise "Invalid limit number. Must be a positive integer" end limit = s.to_i } op.on('-c', '--column-header', 'output of the columns\' header when the schema is available for the table (only applies to tsv and csv formats)', TrueClass) {|b| render_opts[:header] = b; } op.on('-x', '--exclude', 'do not automatically retrieve the job result', TrueClass) {|b| exclude = b } job_id = op.cmd_parse # parameter concurrency validation if output.nil? && format unless ['tsv', 'csv', 'json'].include?(format) raise "Supported formats are only tsv, csv and json without -o / --output option" end end if render_opts[:header] unless ['tsv', 'csv'].include?(format) raise "Option -c / --column-header is only supported with tsv and csv formats" end end if !output.nil? && !limit.nil? raise "Option -l / --limit is only valid when not outputting to file " + "(no -o / --output option provided)" end client = get_client job = client.job(job_id) puts "JobID : #{job.job_id}" #puts "URL : #{job.url}" puts "Status : #{job.status}" puts "Type : #{job.type}" puts "Database : #{job.db_name}" # exclude some fields from bulk_import_perform type jobs if [:hive, :pig, :impala, :presto].include?(job.type) puts "Priority : #{job_priority_name_of(job.priority)}" puts "Retry limit : #{job.retry_limit}" puts "Output : #{job.result_url}" puts "Query : #{job.query}" end if wait && !job.finished? wait_job(job) if [:hive, :pig, :impala, :presto].include?(job.type) && !exclude puts "Result :" begin show_result(job, output, limit, format, render_opts) rescue TreasureData::NotFoundError => e # Got 404 because result not found. end end else if [:hive, :pig, :impala, :presto].include?(job.type) && !exclude puts "Result :" begin show_result(job, output, limit, format, render_opts) rescue TreasureData::NotFoundError => e # Got 404 because result not found. end end if verbose if !job.debug['cmdout'].nil? puts "" puts "cmdout:" job.debug['cmdout'].to_s.split("\n").each {|line| puts " " + line } end if !job.debug['stderr'].nil? puts "" puts "stderr:" job.debug['stderr'].to_s.split("\n").each {|line| puts " " + line } end end end $stderr.puts "Use '-v' option to show detailed messages." unless verbose end def job_status(op) job_id = op.cmd_parse client = get_client puts client.job_status(job_id) end def job_kill(op) job_id = op.cmd_parse client = get_client former_status = client.kill(job_id) if TreasureData::Job::FINISHED_STATUS.include?(former_status) $stderr.puts "Job #{job_id} is already finished (#{former_status})" exit 0 end if former_status == TreasureData::Job::STATUS_RUNNING $stderr.puts "Job #{job_id} is killed." else $stderr.puts "Job #{job_id} is canceled." end end private def wait_job(job, first_call = false) $stderr.puts "queued..." cmdout_lines = 0 stderr_lines = 0 max_error_counts = JOB_WAIT_MAX_RETRY_COUNT_ON_NETWORK_ERROR while first_call || !job.finished? first_call = false begin sleep 2 job.update_status! rescue Timeout::Error, SystemCallError, EOFError, SocketError if max_error_counts <= 0 raise end max_error_counts -= 1 retry end cmdout = job.debug['cmdout'].to_s.split("\n")[cmdout_lines..-1] || [] stderr = job.debug['stderr'].to_s.split("\n")[stderr_lines..-1] || [] (cmdout + stderr).each {|line| puts " "+line } cmdout_lines += cmdout.size stderr_lines += stderr.size end end def show_result(job, output, limit, format, render_opts={}) if output write_result(job, output, limit, format, render_opts) puts "written to #{output} in #{format} format" else render_result(job, limit, format, render_opts) end end def write_result(job, output, limit, format, render_opts={}) # the next 3 formats allow writing to both a file and stdout case format when 'json' require 'yajl' open_file(output, "w") {|f| f.write "[" n_rows = 0 job.result_each {|row| f.write ",\n" if n_rows > 0 f.write Yajl.dump(row) n_rows += 1 break if output.nil? and !limit.nil? and n_rows == limit } f.write "]" } puts if output.nil? when 'csv' require 'yajl' require 'csv' open_file(output, "w") {|f| writer = CSV.new(f) n_rows = 0 # output headers if render_opts[:header] && job.hive_result_schema writer << job.hive_result_schema.map {|name,type| name } end # output data job.result_each {|row| # TODO limit the # of columns writer << row.map {|col| dump_column(col) } n_rows += 1 break if output.nil? and !limit.nil? and n_rows == limit } } when 'tsv' require 'yajl' open_file(output, "w") {|f| # output headers if render_opts[:header] && job.hive_result_schema job.hive_result_schema.each {|name,type| f.write name + "\t" } f.write "\n" end # output data n_rows = 0 job.result_each {|row| n_cols = 0 row.each {|col| f.write "\t" if n_cols > 0 # TODO limit the # of columns f.write dump_column(col) n_cols += 1 } f.write "\n" n_rows += 1 break if output.nil? and !limit.nil? and n_rows == limit } } # these last 2 formats are only valid if writing the result to file through the -o/--output option. when 'msgpack' open_file(output, "wb") {|f| job.result_format('msgpack', f) } when 'msgpack.gz' open_file(output, "wb") {|f| job.result_format('msgpack.gz', f) } else raise "Unknown format #{format.inspect}" end end def open_file(output, mode) f = nil if output.nil? yield STDOUT else f = File.open(output, mode) yield f end ensure if f f.close unless f.closed? end end def render_result(job, limit, format=nil, render_opts={}) require 'yajl' if format.nil? # display result in tabular format rows = [] n_rows = 0 job.result_each {|row| # TODO limit number of rows to show rows << row.map {|v| dump_column(v) } n_rows += 1 break if !limit.nil? and n_rows == limit } render_opts[:max_width] = 10000 if job.hive_result_schema render_opts[:change_fields] = job.hive_result_schema.map { |name,type| name } end puts cmd_render_table(rows, render_opts) else # display result in any of: json, csv, tsv. # msgpack and mspgpack.gz are not supported for stdout output write_result(job, nil, limit, format, render_opts) end end def dump_column(v) require 'yajl' s = v.is_a?(String) ? v.to_s : Yajl.dump(v) # Here does UTF-8 -> UTF-16LE -> UTF8 conversion: # a) to make sure the string doesn't include invalid byte sequence # b) to display multi-byte characters as it is # c) encoding from UTF-8 to UTF-8 doesn't check/replace invalid chars # d) UTF-16LE was slightly faster than UTF-16BE, UTF-32LE or UTF-32BE s = s.encode('UTF-16LE', 'UTF-8', :invalid=>:replace, :undef=>:replace).encode!('UTF-8') if s.respond_to?(:encode) s end def job_priority_name_of(id) PRIORITY_FORMAT_MAP[id] || 'NORMAL' end def job_priority_id_of(name) PRIORITY_PARSE_MAP.each_pair {|pattern,id| return id if pattern.match(name) } return nil end end end