class TreasureData::API module Job #### ## Job API ## # => [(jobId:String, type:Symbol, status:String, start_at:String, end_at:String, result_url:String)] def list_jobs(from=0, to=nil, status=nil, conditions=nil) params = {} params['from'] = from.to_s if from params['to'] = to.to_s if to params['status'] = status.to_s if status params.merge!(conditions) if conditions code, body, res = get("/v3/job/list", params) if code != "200" raise_error("List jobs failed", res) end js = checked_json(body, %w[jobs]) result = [] js['jobs'].each {|m| job_id = m['job_id'] type = (m['type'] || '?').to_sym database = m['database'] status = m['status'] query = m['query'] start_at = m['start_at'] end_at = m['end_at'] cpu_time = m['cpu_time'] result_size = m['result_size'] # compressed result size in msgpack.gz format result_url = m['result'] priority = m['priority'] retry_limit = m['retry_limit'] result << [job_id, type, status, query, start_at, end_at, cpu_time, result_size, result_url, priority, retry_limit, nil, database] } return result end # => (type:Symbol, status:String, result:String, url:String, result:String) def show_job(job_id) # use v3/job/status instead of v3/job/show to poll finish of a job code, body, res = get("/v3/job/show/#{e job_id}") if code != "200" raise_error("Show job failed", res) end js = checked_json(body, %w[status]) # TODO debug type = (js['type'] || '?').to_sym # TODO database = js['database'] query = js['query'] status = js['status'] debug = js['debug'] url = js['url'] start_at = js['start_at'] end_at = js['end_at'] cpu_time = js['cpu_time'] result_size = js['result_size'] # compressed result size in msgpack.gz format result = js['result'] # result target URL hive_result_schema = (js['hive_result_schema'] || '') if hive_result_schema.empty? hive_result_schema = nil else begin hive_result_schema = JSON.parse(hive_result_schema) rescue JSON::ParserError => e # this is a workaround for a Known Limitation in the Pig Engine which does not set a default, auto-generated # column name for anonymous columns (such as the ones that are generated from UDF like COUNT or SUM). # The schema will contain 'nil' for the name of those columns and that breaks the JSON parser since it violates # the JSON syntax standard. if type == :pig and hive_result_schema !~ /[\{\}]/ begin # NOTE: this works because a JSON 2 dimensional array is the same as a Ruby one. # Any change in the format for the hive_result_schema output may cause a syntax error, in which case # this lame attempt at fixing the problem will fail and we will be raising the original JSON exception hive_result_schema = eval(hive_result_schema) rescue SyntaxError => ignored_e raise e end hive_result_schema.each_with_index {|col_schema, idx| if col_schema[0].nil? col_schema[0] = "_col#{idx}" end } else raise e end end end priority = js['priority'] retry_limit = js['retry_limit'] return [type, query, status, url, debug, start_at, end_at, cpu_time, result_size, result, hive_result_schema, priority, retry_limit, nil, database] end def job_status(job_id) code, body, res = get("/v3/job/status/#{e job_id}") if code != "200" raise_error("Get job status failed", res) end js = checked_json(body, %w[status]) return js['status'] end def job_result(job_id) code, body, res = get("/v3/job/result/#{e job_id}", {'format'=>'msgpack'}) if code != "200" raise_error("Get job result failed", res) end result = [] MessagePack::Unpacker.new.feed_each(body) {|row| result << row } return result end # block is optional and must accept 1 parameter def job_result_format(job_id, format, io=nil, &block) if io code, body, res = get("/v3/job/result/#{e job_id}", {'format'=>format}) {|res| if res.code != "200" raise_error("Get job result failed", res) end if ce = res.header['Content-Encoding'] res.extend(DeflateReadBodyMixin) res.gzip = true if ce == 'gzip' else res.extend(DirectReadBodyMixin) end res.extend(DirectReadBodyMixin) if ce = res.header['Content-Encoding'] if ce == 'gzip' infl = Zlib::Inflate.new(Zlib::MAX_WBITS + 16) else infl = Zlib::Inflate.new end end total_compr_size = 0 res.each_fragment {|fragment| total_compr_size += fragment.size # uncompressed if the 'Content-Enconding' header is set in response fragment = infl.inflate(fragment) if ce io.write(fragment) block.call(total_compr_size) if block_given? } } nil else code, body, res = get("/v3/job/result/#{e job_id}", {'format'=>format}) if res.code != "200" raise_error("Get job result failed", res) end body end end # block is optional and must accept 1 argument def job_result_each(job_id, &block) get("/v3/job/result/#{e job_id}", {'format'=>'msgpack'}) {|res| if res.code != "200" raise_error("Get job result failed", res) end # default to decompressing the response since format is fixed to 'msgpack' res.extend(DeflateReadBodyMixin) res.gzip = (res.header['Content-Encoding'] == 'gzip') upkr = MessagePack::Unpacker.new res.each_fragment {|inflated_fragment| upkr.feed_each(inflated_fragment, &block) } } nil end # block is optional and must accept 1 argument def job_result_each_with_compr_size(job_id, &block) get("/v3/job/result/#{e job_id}", {'format'=>'msgpack'}) {|res| if res.code != "200" raise_error("Get job result failed", res) end res.extend(DirectReadBodyMixin) if res.header['Content-Encoding'] == 'gzip' infl = Zlib::Inflate.new(Zlib::MAX_WBITS + 16) else infl = Zlib::Inflate.new end upkr = MessagePack::Unpacker.new begin total_compr_size = 0 res.each_fragment {|fragment| total_compr_size += fragment.size upkr.feed_each(infl.inflate(fragment)) {|unpacked| block.call(unpacked, total_compr_size) if block_given? } } ensure infl.close end } nil end def job_result_raw(job_id, format) code, body, res = get("/v3/job/result/#{e job_id}", {'format'=>format}) if code != "200" raise_error("Get job result failed", res) end return body end def kill(job_id) code, body, res = post("/v3/job/kill/#{e job_id}") if code != "200" raise_error("Kill job failed", res) end js = checked_json(body, %w[]) former_status = js['former_status'] return former_status end # => jobId:String def hive_query(q, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={}) query(q, :hive, db, result_url, priority, retry_limit, opts) end # => jobId:String def pig_query(q, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={}) query(q, :pig, db, result_url, priority, retry_limit, opts) end # => jobId:String def query(q, type=:hive, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={}) params = {'query' => q}.merge(opts) params['result'] = result_url if result_url params['priority'] = priority if priority params['retry_limit'] = retry_limit if retry_limit code, body, res = post("/v3/job/issue/#{type}/#{e db}", params) if code != "200" raise_error("Query failed", res) end js = checked_json(body, %w[job_id]) return js['job_id'].to_s end end end