src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.1.1 vs src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.1.2

- old
+ new

@@ -1,87 +1,120 @@ package org.embulk.output; +import java.io.File; import java.io.IOException; +import java.io.FileNotFoundException; +import java.io.FileInputStream; +import java.io.BufferedInputStream; +import com.google.api.client.http.InputStreamContent; import java.util.ArrayList; import java.util.List; import java.util.Iterator; import java.util.HashMap; import java.util.IllegalFormatException; import java.util.concurrent.Callable; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.StringUtils; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; +import com.google.common.base.Throwables; import java.security.GeneralSecurityException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.core.type.TypeReference; + import org.embulk.spi.Exec; import org.slf4j.Logger; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.BigqueryScopes; import com.google.api.services.bigquery.Bigquery.Datasets; +import com.google.api.services.bigquery.Bigquery.Tables; import com.google.api.services.bigquery.Bigquery.Jobs.Insert; import com.google.api.services.bigquery.Bigquery.Jobs.GetQueryResults; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.DatasetList; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableList; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.ErrorProto; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.googleapis.media.MediaHttpUploader; +import com.google.api.client.googleapis.media.MediaHttpUploaderProgressListener; +import com.google.api.client.googleapis.media.MediaHttpUploader.UploadState; + public class BigqueryWriter { private final Logger log = Exec.getLogger(BigqueryWriter.class); private final String project; private final String dataset; private final String table; private final boolean autoCreateTable; private final Optional<String> schemaPath; - private final String bucket; + private final TableSchema tableSchema; private final String sourceFormat; private final String fieldDelimiter; private final int maxBadrecords; + private final String encoding; private final long jobStatusMaxPollingTime; private final long jobStatusPollingInterval; private final boolean isSkipJobResultCheck; private final Bigquery bigQueryClient; - private final EmbulkBigqueryTask writerTask; - public BigqueryWriter(Builder builder) throws IOException, GeneralSecurityException + public BigqueryWriter(Builder builder) throws FileNotFoundException, IOException, GeneralSecurityException { this.project = builder.project; this.dataset = builder.dataset; this.table = builder.table; this.autoCreateTable = builder.autoCreateTable; this.schemaPath = builder.schemaPath; - this.bucket = builder.bucket; this.sourceFormat = builder.sourceFormat.toUpperCase(); this.fieldDelimiter = builder.fieldDelimiter; this.maxBadrecords = builder.maxBadrecords; + this.encoding = builder.encoding.toUpperCase(); this.jobStatusMaxPollingTime = builder.jobStatusMaxPollingTime; this.jobStatusPollingInterval = builder.jobStatusPollingInterval; this.isSkipJobResultCheck = builder.isSkipJobResultCheck; BigqueryAuthentication auth = new BigqueryAuthentication(builder.serviceAccountEmail, builder.p12KeyFilePath, builder.applicationName); this.bigQueryClient = auth.getBigqueryClient(); - this.writerTask = new EmbulkBigqueryTask(); + + checkConfig(); + if (autoCreateTable) { + this.tableSchema = createTableSchema(builder.schemaPath); + } else { + this.tableSchema = null; + } } private String getJobStatus(JobReference jobRef) throws JobFailedException { try { Job job = bigQueryClient.jobs().get(project, jobRef.getJobId()).execute(); - if (job.getStatus().getErrorResult() != null) { - throw new JobFailedException(String.format("Job failed. job id:[%s] reason:[%s] status:[FAILED]", jobRef.getJobId(), job.getStatus().getErrorResult().getMessage())); + + ErrorProto fatalError = job.getStatus().getErrorResult(); + if (fatalError != null) { + throw new JobFailedException(String.format("Job failed. job id:[%s] reason:[%s][%s] status:[FAILED]", jobRef.getJobId(), fatalError.getReason(), fatalError.getMessage())); } + List<ErrorProto> errors = job.getStatus().getErrors(); + if (errors != null) { + for (ErrorProto error : errors) { + log.warn(String.format("Error: job id:[%s] reason[%s][%s] location:[%s]", jobRef.getJobId(), error.getReason(), error.getMessage(), error.getLocation())); + } + } + String jobStatus = job.getStatus().getState(); if (jobStatus.equals("DONE")) { JobStatistics statistics = job.getStatistics(); //log.info(String.format("Job end. create:[%s] end:[%s]", statistics.getCreationTime(), statistics.getEndTime())); log.info(String.format("Job statistics [%s]", statistics.getLoad())); @@ -115,200 +148,152 @@ } catch (InterruptedException ex) { log.warn(ex.getMessage()); } } - public void executeJob() throws IOException, TimeoutException, JobFailedException + public void executeLoad(String localFilePath) throws GoogleJsonResponseException, IOException, TimeoutException, JobFailedException { - // TODO: refactor - ArrayList<ArrayList<HashMap<String, String>>> taskList = writerTask.createJobList(); - for (ArrayList<HashMap<String, String>> task : taskList) { - Job job = createJob(task); - // TODO: multi-threading - new EmbulkBigqueryJob(job).call(); - } - } - - private Job createJob(ArrayList<HashMap<String, String>> task) - { log.info(String.format("Job preparing... project:%s dataset:%s table:%s", project, dataset, table)); Job job = new Job(); + JobReference jobRef = null; JobConfiguration jobConfig = new JobConfiguration(); JobConfigurationLoad loadConfig = new JobConfigurationLoad(); jobConfig.setLoad(loadConfig); job.setConfiguration(jobConfig); loadConfig.setAllowQuotedNewlines(false); + loadConfig.setEncoding(encoding); + loadConfig.setMaxBadRecords(maxBadrecords); if (sourceFormat.equals("NEWLINE_DELIMITED_JSON")) { loadConfig.setSourceFormat("NEWLINE_DELIMITED_JSON"); } else { loadConfig.setFieldDelimiter(fieldDelimiter); } + loadConfig.setWriteDisposition("WRITE_APPEND"); if (autoCreateTable) { - loadConfig.setSchema(getTableSchema()); - loadConfig.setWriteDisposition("WRITE_EMPTY"); + loadConfig.setSchema(tableSchema); loadConfig.setCreateDisposition("CREATE_IF_NEEDED"); - log.info(String.format("table:[%s] will be create.", table)); + log.info(String.format("table:[%s] will be create if not exists", table)); } else { - loadConfig.setWriteDisposition("WRITE_APPEND"); loadConfig.setCreateDisposition("CREATE_NEVER"); } - loadConfig.setMaxBadRecords(maxBadrecords); - List<String> sources = new ArrayList<String>(); - for (HashMap<String, String> file : task) { - String sourceFile; - String remotePath = getRemotePath(file.get("remote_path"), file.get("file_name")); - sourceFile = "gs://" + remotePath; - log.info(String.format("Add source file to job [%s]", sourceFile)); - sources.add(sourceFile); - } - loadConfig.setSourceUris(sources); - loadConfig.setDestinationTable(getTableReference()); + loadConfig.setDestinationTable(createTableReference()); - return job; + File file = new File(localFilePath); + InputStreamContent mediaContent = new InputStreamContent("application/octet-stream", + new BufferedInputStream( + new FileInputStream(file))); + mediaContent.setLength(file.length()); + + Insert insert = bigQueryClient.jobs().insert(project, job, mediaContent); + insert.setProjectId(project); + insert.setDisableGZipContent(true); + + // @see https://code.google.com/p/google-api-java-client/wiki/MediaUpload + UploadProgressListener listner = new UploadProgressListener(); + listner.setFileName(localFilePath); + insert.getMediaHttpUploader() + .setProgressListener(listner) + .setDirectUploadEnabled(false); + + try { + jobRef = insert.execute().getJobReference(); + } catch (Exception ex) { + log.warn("Job execution was failed. Please check your settings or data... like data matches schema"); + throw Throwables.propagate(ex); + } + log.info(String.format("Job executed. job id:[%s] file:[%s]", jobRef.getJobId(), localFilePath)); + if (isSkipJobResultCheck) { + log.info(String.format("Skip job status check. job id:[%s]", jobRef.getJobId())); + } else { + getJobStatusUntilDone(jobRef); + } } - private TableReference getTableReference() + private TableReference createTableReference() { return new TableReference() .setProjectId(project) .setDatasetId(dataset) .setTableId(table); } - private TableSchema getTableSchema() + private TableSchema createTableSchema(Optional<String> schemaPath) throws FileNotFoundException, IOException { - TableSchema tableSchema = new TableSchema(); - List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>(); - TableFieldSchema tableField; - // TODO import from json file - /* - for () { - tableField = new TableFieldSchema() - .setName(name) - .setType(type); - fields.add(tableField); + String path = schemaPath.orNull(); + File file = new File(path); + FileInputStream stream = null; + try { + stream = new FileInputStream(file); + ObjectMapper mapper = new ObjectMapper(); + List<TableFieldSchema> fields = mapper.readValue(stream, new TypeReference<List<TableFieldSchema>>() {}); + TableSchema tableSchema = new TableSchema().setFields(fields); + return tableSchema; + } finally { + if (stream != null) { + stream.close(); + } } - */ - - tableSchema.setFields(fields); - return tableSchema; } - private String getRemotePath(String remotePath, String fileName) + public boolean isExistTable(String tableName) throws IOException { - String[] pathList = StringUtils.split(remotePath, '/'); - String path; - if (remotePath.isEmpty()) { - path = bucket + "/" + fileName; - } else { - path = bucket + "/" + StringUtils.join(pathList) + "/" + fileName; + Tables tableRequest = bigQueryClient.tables(); + try { + Table tableData = tableRequest.get(project, dataset, tableName).execute(); + } catch (GoogleJsonResponseException ex) { + return false; } - return path; + return true; } - public void addTask(Optional<String> remotePath, String fileName, long fileSize) + public void checkConfig() throws FileNotFoundException, IOException { - writerTask.addTaskFile(remotePath, fileName, fileSize); - } - - public ArrayList<HashMap<String, String>> getFileList() - { - return writerTask.getFileList(); - } - - private class EmbulkBigqueryJob implements Callable<Void> - { - private final Job job; - - public EmbulkBigqueryJob(Job job) - { - this.job = job; - } - - public Void call() throws IOException, TimeoutException, JobFailedException - { - Insert insert = bigQueryClient.jobs().insert(project, job); - insert.setProjectId(project); - JobReference jobRef = insert.execute().getJobReference(); - log.info(String.format("Job executed. job id:[%s]", jobRef.getJobId())); - if (isSkipJobResultCheck) { - log.info(String.format("Skip job status check. job id:[%s]", jobRef.getJobId())); + if (autoCreateTable) { + if (!schemaPath.isPresent()) { + throw new IOException("schema_path is empty"); } else { - getJobStatusUntilDone(jobRef); + File file = new File(schemaPath.orNull()); + if (!file.exists()) { + throw new FileNotFoundException("Can not load schema file."); + } } - return null; + } else { + if (!isExistTable(table)) { + throw new IOException(String.format("table [%s] is not exists", table)); + } } } - private class EmbulkBigqueryTask + private class UploadProgressListener implements MediaHttpUploaderProgressListener { - // https://cloud.google.com/bigquery/loading-data-into-bigquery#quota - private final long MAX_SIZE_PER_LOAD_JOB = 1000 * 1024 * 1024 * 1024L; // 1TB - private final int MAX_NUMBER_OF_FILES_PER_LOAD_JOB = 10000; + private String fileName; - private final ArrayList<HashMap<String, String>> taskList = new ArrayList<HashMap<String, String>>(); - private final ArrayList<ArrayList<HashMap<String, String>>> jobList = new ArrayList<ArrayList<HashMap<String, String>>>(); - - public void addTaskFile(Optional<String> remotePath, String fileName, long fileSize) + @Override + public void progressChanged(MediaHttpUploader uploader) throws IOException { - HashMap<String, String> task = new HashMap<String, String>(); - if (remotePath.isPresent()) { - task.put("remote_path", remotePath.get()); - } else { - task.put("remote_path", ""); + switch (uploader.getUploadState()) { + case INITIATION_STARTED: + log.info(String.format("Upload start [%s]", fileName)); + break; + case INITIATION_COMPLETE: + //log.info(String.format("Upload initiation completed file [%s]", fileName)); + break; + case MEDIA_IN_PROGRESS: + log.debug(String.format("Uploading [%s] progress %3.0f", fileName, uploader.getProgress() * 100) + "%"); + break; + case MEDIA_COMPLETE: + log.info(String.format("Upload completed [%s]", fileName)); } - task.put("file_name", fileName); - task.put("file_size", String.valueOf(fileSize)); - taskList.add(task); } - public ArrayList<ArrayList<HashMap<String, String>>> createJobList() + public void setFileName(String fileName) { - long currentBundleSize = 0; - int currentFileCount = 0; - ArrayList<HashMap<String, String>> job = new ArrayList<HashMap<String, String>>(); - for (HashMap<String, String> task : taskList) { - boolean isNeedNextJobList = false; - long fileSize = Long.valueOf(task.get("file_size")).longValue(); - - if (currentBundleSize + fileSize > MAX_SIZE_PER_LOAD_JOB) { - isNeedNextJobList = true; - } - - if (currentFileCount >= MAX_NUMBER_OF_FILES_PER_LOAD_JOB) { - isNeedNextJobList = true; - } - - if (isNeedNextJobList) { - jobList.add(job); - job = new ArrayList<HashMap<String, String>>(); - job.add(task); - currentBundleSize = 0; - } else { - job.add(task); - } - currentBundleSize += fileSize; - currentFileCount++; - - log.debug(String.format("currentBundleSize:%s currentFileCount:%s", currentBundleSize, currentFileCount)); - log.debug(String.format("fileSize:%s, MAX_SIZE_PER_LOAD_JOB:%s MAX_NUMBER_OF_FILES_PER_LOAD_JOB:%s", - fileSize, MAX_SIZE_PER_LOAD_JOB, MAX_NUMBER_OF_FILES_PER_LOAD_JOB)); - - } - if (job.size() > 0) { - jobList.add(job); - } - return jobList; + this.fileName = fileName; } - - public ArrayList<HashMap<String, String>> getFileList() - { - return taskList; - } } public static class Builder { private final String serviceAccountEmail; @@ -317,14 +302,14 @@ private String project; private String dataset; private String table; private boolean autoCreateTable; private Optional<String> schemaPath; - private String bucket; private String sourceFormat; private String fieldDelimiter; private int maxBadrecords; + private String encoding; private int jobStatusMaxPollingTime; private int jobStatusPollingInterval; private boolean isSkipJobResultCheck; @@ -373,16 +358,10 @@ { this.schemaPath = schemaPath; return this; } - public Builder setBucket(String bucket) - { - this.bucket = bucket; - return this; - } - public Builder setSourceFormat(String sourceFormat) { this.sourceFormat = sourceFormat; return this; } @@ -394,9 +373,15 @@ } public Builder setMaxBadrecords(int maxBadrecords) { this.maxBadrecords = maxBadrecords; + return this; + } + + public Builder setEncoding(String encoding) + { + this.encoding = encoding; return this; } public Builder setJobStatusMaxPollingTime(int jobStatusMaxPollingTime) { \ No newline at end of file