lib/td/client/api/bulk_load.rb in td-client-0.8.69 vs lib/td/client/api/bulk_load.rb in td-client-0.8.70
- old
+ new
@@ -1,7 +1,5 @@
-require 'td/client/api/to_hash_struct'
-
class TreasureData::API
module BulkLoad
####
## BulkLoad (Server-side Bulk loader) API
@@ -40,139 +38,114 @@
# "delay": delay_seconds,
# "database": database_name,
# "table": table_name
# }
- ## Resource definitions
-
- class BulkLoad < ToHashStruct.new(:config, :name, :cron, :timezone, :delay, :time_column, :database, :table)
- class BulkLoadSessionConfig < ToHashStruct.new(:type, :access_key_id, :secret_access_key, :endpoint, :bucket, :path_prefix, :parser, :decoders)
- def validate_self
- validate_presence_of :type
- end
- end
-
- model_property :config, BulkLoadSessionConfig
-
- def validate_self
- validate_presence_of :config
- end
- end
-
- class BulkLoadPreview < ToHashStruct.new(:schema, :records)
- end
-
- class Job < ToHashStruct.new(:job_id, :account_id, :type, :status, :cpu_time, :config, :records, :schema, :database, :table, :priority, :created_at, :updated_at, :start_at, :end_at)
- model_property :config, BulkLoad::BulkLoadSessionConfig
- end
-
- ## API definitions
-
LIST = '/v3/bulk_loads'
SESSION = LIST + '/%s'
JOB = SESSION + '/jobs'
- # job: BulkLoad -> BulkLoad
+ # job: Hash -> Hash
def bulk_load_guess(job)
# retry_request = true
path = LIST + '/guess'
- res = api { post(path, job.validate.to_json) }
+ res = api { post(path, job.to_json) }
unless res.ok?
raise_error('BulkLoad configuration guess failed', res)
end
- BulkLoad.from_json(res.body)
+ JSON.load(res.body)
end
- # job: BulkLoad -> BulkLoadPreview
+ # job: Hash -> Hash
def bulk_load_preview(job)
# retry_request = true
path = LIST + '/preview'
- res = api { post(path, job.validate.to_json) }
+ res = api { post(path, job.to_json) }
unless res.ok?
raise_error('BulkLoad job preview failed', res)
end
- BulkLoadPreview.from_json(res.body)
+ JSON.load(res.body)
end
- # job: BulkLoad -> String (job_id)
+ # job: Hash -> String (job_id)
def bulk_load_issue(database, table, job)
type = 'bulkload'
job = job.dup
job['database'] = database
job['table'] = table
path = "/v3/job/issue/#{e type}/#{e database}"
- res = api { post(path, job.validate.to_json) }
+ res = api { post(path, job.to_json) }
unless res.ok?
raise_error('BulkLoad job issuing failed', res)
end
js = checked_json(res.body)
js['job_id'].to_s
end
- # nil -> [BulkLoad]
+ # nil -> [Hash]
def bulk_load_list
res = api { get(LIST) }
unless res.ok?
raise_error("BulkLoadSession list retrieve failed", res)
end
- to_ary(res, BulkLoad)
+ JSON.load(res.body)
end
- # name: String, database: String, table: String, job: BulkLoad -> BulkLoad
+ # name: String, database: String, table: String, job: Hash -> Hash
def bulk_load_create(name, database, table, job, opts = {})
job = job.dup
job['name'] = name
[:cron, :timezone, :delay, :time_column].each do |prop|
job[prop.to_s] = opts[prop] if opts.key?(prop)
end
job['database'] = database
job['table'] = table
- res = api { post(LIST, job.validate.to_json) }
+ res = api { post(LIST, job.to_json) }
unless res.ok?
raise_error("BulkLoadSession: #{name} create failed", res)
end
- BulkLoad.from_json(res.body)
+ JSON.load(res.body)
end
- # name: String -> BulkLoad
+ # name: String -> Hash
def bulk_load_show(name)
path = session_path(name)
res = api { get(path) }
unless res.ok?
raise_error("BulkLoadSession: #{name} retrieve failed", res)
end
- BulkLoad.from_json(res.body)
+ JSON.load(res.body)
end
- # name: String, job: BulkLoad -> BulkLoad
+ # name: String, job: Hash -> Hash
def bulk_load_update(name, job)
path = session_path(name)
- res = api { put(path, job.validate.to_json) }
+ res = api { put(path, job.to_json) }
unless res.ok?
raise_error("BulkLoadSession: #{name} update failed", res)
end
- BulkLoad.from_json(res.body)
+ JSON.load(res.body)
end
- # name: String -> BulkLoad
+ # name: String -> Hash
def bulk_load_delete(name)
path = session_path(name)
res = api { delete(path) }
unless res.ok?
raise_error("BulkLoadSession: #{name} delete failed", res)
end
- BulkLoad.from_json(res.body)
+ JSON.load(res.body)
end
- # name: String -> [Job]
+ # name: String -> [Hash]
def bulk_load_history(name)
path = job_path(name)
res = api { get(path) }
unless res.ok?
raise_error("history of BulkLoadSession: #{name} retrieve failed", res)
end
- to_ary(res, Job)
+ JSON.load(res.body)
end
def bulk_load_run(name, scheduled_time = nil)
path = job_path(name)
opts = {}
@@ -191,15 +164,9 @@
SESSION % e(name)
end
def job_path(name)
JOB % e(name)
- end
-
- def to_ary(res, klass)
- JSON.parse(res.body).map { |bulk_load|
- klass.from_hash(bulk_load)
- }
end
end
end