require 'net/http' require 'uri' require 'json' require 'addressable/uri' require_relative 'exceptions' module WebHDFS class ClientV1 # This hash table holds command options. OPT_TABLE = {} # internal use only KNOWN_ERRORS = ['LeaseExpiredException'].freeze attr_accessor :host, :port, :username, :doas, :proxy_address, :proxy_port attr_accessor :proxy_user, :proxy_pass attr_accessor :open_timeout # default 30s (in ruby net/http) attr_accessor :read_timeout # default 60s (in ruby net/http) attr_accessor :httpfs_mode attr_accessor :retry_known_errors # default false (not to retry) attr_accessor :retry_times # default 1 (ignored when retry_known_errors is false) attr_accessor :retry_interval # default 1 ([sec], ignored when retry_known_errors is false) attr_accessor :ssl attr_accessor :ssl_ca_file attr_reader :ssl_verify_mode attr_accessor :ssl_cert attr_accessor :ssl_key attr_accessor :ssl_version attr_accessor :kerberos, :kerberos_keytab attr_accessor :http_headers attr_accessor :kerberos_delegation_token SSL_VERIFY_MODES = [:none, :peer] def ssl_verify_mode=(mode) unless SSL_VERIFY_MODES.include? mode raise ArgumentError, "Invalid SSL verify mode #{mode.inspect}" end @ssl_verify_mode = mode end def initialize(host='localhost', port=50070, username=nil, doas=nil, proxy_address=nil, proxy_port=nil, http_headers={}, renew_kerberos_delegation_token_time_hour=nil) @host = host @port = port @username = username @doas = doas @proxy_address = proxy_address @proxy_port = proxy_port @retry_known_errors = false @retry_times = 1 @retry_interval = 1 @httpfs_mode = false @ssl = false @ssl_ca_file = nil @ssl_verify_mode = nil @ssl_cert = nil @ssl_key = nil @ssl_version = nil @kerberos = false @kerberos_keytab = nil @renew_kerberos_delegation_token_time_hour = renew_kerberos_delegation_token_time_hour @kerberos_delegation_token = nil @kerberos_token_updated_at = Time.now @http_headers = http_headers end def should_kerberos_token_updated? @kerberos_token_updated_at + (@renew_kerberos_delegation_token_time_hour * 60 * 60) <= Time.now end def get_cached_kerberos_delegation_token(force_renew=nil) return @kerberos_delegation_token if @kerberos_delegation_token && !should_kerberos_token_updated? && !force_renew if !@kerberos_delegation_token || force_renew @kerberos_delegation_token = get_kerberos_delegation_token(@username) @kerberos_token_updated_at = Time.now else renew_kerberos_delegation_token(@kerberos_delegation_token) @kerberos_token_updated_at = Time.now end @kerberos_delegation_token end # curl -i -X PUT "http://:/webhdfs/v1/?op=CREATE # [&overwrite=][&blocksize=][&replication=] # [&permission=][&buffersize=] # [&delegation=]" def create(path, body, options={}) if @httpfs_mode options = options.merge({'data' => 'true'}) end if @renew_kerberos_delegation_token_time_hour options = options.merge('delegation' => get_cached_kerberos_delegation_token) end check_options(options, OPT_TABLE['CREATE']) res = operate_requests('PUT', path, 'CREATE', options, body) res.code == '201' end OPT_TABLE['CREATE'] = ['overwrite', 'blocksize', 'replication', 'permission', 'buffersize', 'data', 'delegation'] # curl -i -X POST "http://:/webhdfs/v1/?op=APPEND # [&buffersize=][&delegation=]" def append(path, body, options={}) if @httpfs_mode options = options.merge({'data' => 'true'}) end if @renew_kerberos_delegation_token_time_hour options = options.merge('delegation' => get_cached_kerberos_delegation_token) end check_options(options, OPT_TABLE['APPEND']) res = operate_requests('POST', path, 'APPEND', options, body) res.code == '200' end OPT_TABLE['APPEND'] = ['buffersize', 'data', 'delegation'] # curl -i -L "http://:/webhdfs/v1/?op=OPEN # [&offset=][&length=][&buffersize=]" def read(path, options={}) check_options(options, OPT_TABLE['OPEN']) res = operate_requests('GET', path, 'OPEN', options) res.body end OPT_TABLE['OPEN'] = ['offset', 'length', 'buffersize'] alias :open :read # curl -i -X PUT "http://:/?op=MKDIRS[&permission=]" def mkdir(path, options={}) check_options(options, OPT_TABLE['MKDIRS']) res = operate_requests('PUT', path, 'MKDIRS', options) check_success_json(res, 'boolean') end OPT_TABLE['MKDIRS'] = ['permission'] alias :mkdirs :mkdir # curl -i -X PUT ":/webhdfs/v1/?op=RENAME&destination=" def rename(path, dest, options={}) check_options(options, OPT_TABLE['RENAME']) unless dest.start_with?('/') dest = '/' + dest end res = operate_requests('PUT', path, 'RENAME', options.merge({'destination' => dest})) check_success_json(res, 'boolean') end # curl -i -X DELETE "http://:/webhdfs/v1/?op=DELETE # [&recursive=]" def delete(path, options={}) check_options(options, OPT_TABLE['DELETE']) res = operate_requests('DELETE', path, 'DELETE', options) check_success_json(res, 'boolean') end OPT_TABLE['DELETE'] = ['recursive'] # curl -i "http://:/webhdfs/v1/?op=GETFILESTATUS" def stat(path, options={}) check_options(options, OPT_TABLE['GETFILESTATUS']) res = operate_requests('GET', path, 'GETFILESTATUS', options) check_success_json(res, 'FileStatus') end alias :getfilestatus :stat # curl -i "http://:/webhdfs/v1/?op=LISTSTATUS" def list(path, options={}) check_options(options, OPT_TABLE['LISTSTATUS']) res = operate_requests('GET', path, 'LISTSTATUS', options) check_success_json(res, 'FileStatuses')['FileStatus'] end alias :liststatus :list # curl -i "http://:/webhdfs/v1/?op=GETCONTENTSUMMARY" def content_summary(path, options={}) check_options(options, OPT_TABLE['GETCONTENTSUMMARY']) res = operate_requests('GET', path, 'GETCONTENTSUMMARY', options) check_success_json(res, 'ContentSummary') end alias :getcontentsummary :content_summary # curl -i "http://:/webhdfs/v1/?op=GETFILECHECKSUM" def checksum(path, options={}) check_options(options, OPT_TABLE['GETFILECHECKSUM']) res = operate_requests('GET', path, 'GETFILECHECKSUM', options) check_success_json(res, 'FileChecksum') end alias :getfilechecksum :checksum # curl -i "http://:/webhdfs/v1/?op=GETHOMEDIRECTORY" def homedir(options={}) check_options(options, OPT_TABLE['GETHOMEDIRECTORY']) res = operate_requests('GET', '/', 'GETHOMEDIRECTORY', options) check_success_json(res, 'Path') end alias :gethomedirectory :homedir # curl -i "http://:/webhdfs/v1/?op=GETTRASHROOT" def gettrashroot(options={}) check_options(options, OPT_TABLE['GETTRASHROOT']) res = operate_requests('GET', '/', 'GETTRASHROOT', options) check_success_json(res, 'Path') end # curl -i -X PUT "http://:/webhdfs/v1/?op=SETPERMISSION # [&permission=]" def chmod(path, mode, options={}) check_options(options, OPT_TABLE['SETPERMISSION']) res = operate_requests('PUT', path, 'SETPERMISSION', options.merge({'permission' => mode})) res.code == '200' end alias :setpermission :chmod # curl -i -X PUT "http://:/webhdfs/v1/?op=SETOWNER # [&owner=][&group=]" def chown(path, options={}) check_options(options, OPT_TABLE['SETOWNER']) unless options.has_key?('owner') or options.has_key?('group') or options.has_key?(:owner) or options.has_key?(:group) raise ArgumentError, "'chown' needs at least one of owner or group" end res = operate_requests('PUT', path, 'SETOWNER', options) res.code == '200' end OPT_TABLE['SETOWNER'] = ['owner', 'group'] alias :setowner :chown # curl -i -X PUT "http://:/webhdfs/v1/?op=SETREPLICATION # [&replication=]" def replication(path, replnum, options={}) check_options(options, OPT_TABLE['SETREPLICATION']) res = operate_requests('PUT', path, 'SETREPLICATION', options.merge({'replication' => replnum.to_s})) check_success_json(res, 'boolean') end alias :setreplication :replication # curl -i -X PUT "http://:/webhdfs/v1/?op=SETTIMES # [&modificationtime=