src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.1.1 vs src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.1.2
- old
+ new
@@ -1,87 +1,120 @@
package org.embulk.output;
+import java.io.File;
import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.FileInputStream;
+import java.io.BufferedInputStream;
+import com.google.api.client.http.InputStreamContent;
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
import java.util.HashMap;
import java.util.IllegalFormatException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
+import com.google.common.base.Throwables;
import java.security.GeneralSecurityException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+
import org.embulk.spi.Exec;
import org.slf4j.Logger;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.services.bigquery.Bigquery.Datasets;
+import com.google.api.services.bigquery.Bigquery.Tables;
import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
import com.google.api.services.bigquery.Bigquery.Jobs.GetQueryResults;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.DatasetList;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableList;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.media.MediaHttpUploader;
+import com.google.api.client.googleapis.media.MediaHttpUploaderProgressListener;
+import com.google.api.client.googleapis.media.MediaHttpUploader.UploadState;
+
public class BigqueryWriter
{
private final Logger log = Exec.getLogger(BigqueryWriter.class);
private final String project;
private final String dataset;
private final String table;
private final boolean autoCreateTable;
private final Optional<String> schemaPath;
- private final String bucket;
+ private final TableSchema tableSchema;
private final String sourceFormat;
private final String fieldDelimiter;
private final int maxBadrecords;
+ private final String encoding;
private final long jobStatusMaxPollingTime;
private final long jobStatusPollingInterval;
private final boolean isSkipJobResultCheck;
private final Bigquery bigQueryClient;
- private final EmbulkBigqueryTask writerTask;
- public BigqueryWriter(Builder builder) throws IOException, GeneralSecurityException
+ public BigqueryWriter(Builder builder) throws FileNotFoundException, IOException, GeneralSecurityException
{
this.project = builder.project;
this.dataset = builder.dataset;
this.table = builder.table;
this.autoCreateTable = builder.autoCreateTable;
this.schemaPath = builder.schemaPath;
- this.bucket = builder.bucket;
this.sourceFormat = builder.sourceFormat.toUpperCase();
this.fieldDelimiter = builder.fieldDelimiter;
this.maxBadrecords = builder.maxBadrecords;
+ this.encoding = builder.encoding.toUpperCase();
this.jobStatusMaxPollingTime = builder.jobStatusMaxPollingTime;
this.jobStatusPollingInterval = builder.jobStatusPollingInterval;
this.isSkipJobResultCheck = builder.isSkipJobResultCheck;
BigqueryAuthentication auth = new BigqueryAuthentication(builder.serviceAccountEmail, builder.p12KeyFilePath, builder.applicationName);
this.bigQueryClient = auth.getBigqueryClient();
- this.writerTask = new EmbulkBigqueryTask();
+
+ checkConfig();
+ if (autoCreateTable) {
+ this.tableSchema = createTableSchema(builder.schemaPath);
+ } else {
+ this.tableSchema = null;
+ }
}
private String getJobStatus(JobReference jobRef) throws JobFailedException
{
try {
Job job = bigQueryClient.jobs().get(project, jobRef.getJobId()).execute();
- if (job.getStatus().getErrorResult() != null) {
- throw new JobFailedException(String.format("Job failed. job id:[%s] reason:[%s] status:[FAILED]", jobRef.getJobId(), job.getStatus().getErrorResult().getMessage()));
+
+ ErrorProto fatalError = job.getStatus().getErrorResult();
+ if (fatalError != null) {
+ throw new JobFailedException(String.format("Job failed. job id:[%s] reason:[%s][%s] status:[FAILED]", jobRef.getJobId(), fatalError.getReason(), fatalError.getMessage()));
}
+ List<ErrorProto> errors = job.getStatus().getErrors();
+ if (errors != null) {
+ for (ErrorProto error : errors) {
+ log.warn(String.format("Error: job id:[%s] reason[%s][%s] location:[%s]", jobRef.getJobId(), error.getReason(), error.getMessage(), error.getLocation()));
+ }
+ }
+
String jobStatus = job.getStatus().getState();
if (jobStatus.equals("DONE")) {
JobStatistics statistics = job.getStatistics();
//log.info(String.format("Job end. create:[%s] end:[%s]", statistics.getCreationTime(), statistics.getEndTime()));
log.info(String.format("Job statistics [%s]", statistics.getLoad()));
@@ -115,200 +148,152 @@
} catch (InterruptedException ex) {
log.warn(ex.getMessage());
}
}
- public void executeJob() throws IOException, TimeoutException, JobFailedException
+ public void executeLoad(String localFilePath) throws GoogleJsonResponseException, IOException, TimeoutException, JobFailedException
{
- // TODO: refactor
- ArrayList<ArrayList<HashMap<String, String>>> taskList = writerTask.createJobList();
- for (ArrayList<HashMap<String, String>> task : taskList) {
- Job job = createJob(task);
- // TODO: multi-threading
- new EmbulkBigqueryJob(job).call();
- }
- }
-
- private Job createJob(ArrayList<HashMap<String, String>> task)
- {
log.info(String.format("Job preparing... project:%s dataset:%s table:%s", project, dataset, table));
Job job = new Job();
+ JobReference jobRef = null;
JobConfiguration jobConfig = new JobConfiguration();
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
jobConfig.setLoad(loadConfig);
job.setConfiguration(jobConfig);
loadConfig.setAllowQuotedNewlines(false);
+ loadConfig.setEncoding(encoding);
+ loadConfig.setMaxBadRecords(maxBadrecords);
if (sourceFormat.equals("NEWLINE_DELIMITED_JSON")) {
loadConfig.setSourceFormat("NEWLINE_DELIMITED_JSON");
} else {
loadConfig.setFieldDelimiter(fieldDelimiter);
}
+ loadConfig.setWriteDisposition("WRITE_APPEND");
if (autoCreateTable) {
- loadConfig.setSchema(getTableSchema());
- loadConfig.setWriteDisposition("WRITE_EMPTY");
+ loadConfig.setSchema(tableSchema);
loadConfig.setCreateDisposition("CREATE_IF_NEEDED");
- log.info(String.format("table:[%s] will be create.", table));
+ log.info(String.format("table:[%s] will be create if not exists", table));
} else {
- loadConfig.setWriteDisposition("WRITE_APPEND");
loadConfig.setCreateDisposition("CREATE_NEVER");
}
- loadConfig.setMaxBadRecords(maxBadrecords);
- List<String> sources = new ArrayList<String>();
- for (HashMap<String, String> file : task) {
- String sourceFile;
- String remotePath = getRemotePath(file.get("remote_path"), file.get("file_name"));
- sourceFile = "gs://" + remotePath;
- log.info(String.format("Add source file to job [%s]", sourceFile));
- sources.add(sourceFile);
- }
- loadConfig.setSourceUris(sources);
- loadConfig.setDestinationTable(getTableReference());
+ loadConfig.setDestinationTable(createTableReference());
- return job;
+ File file = new File(localFilePath);
+ InputStreamContent mediaContent = new InputStreamContent("application/octet-stream",
+ new BufferedInputStream(
+ new FileInputStream(file)));
+ mediaContent.setLength(file.length());
+
+ Insert insert = bigQueryClient.jobs().insert(project, job, mediaContent);
+ insert.setProjectId(project);
+ insert.setDisableGZipContent(true);
+
+ // @see https://code.google.com/p/google-api-java-client/wiki/MediaUpload
+ UploadProgressListener listner = new UploadProgressListener();
+ listner.setFileName(localFilePath);
+ insert.getMediaHttpUploader()
+ .setProgressListener(listner)
+ .setDirectUploadEnabled(false);
+
+ try {
+ jobRef = insert.execute().getJobReference();
+ } catch (Exception ex) {
+ log.warn("Job execution was failed. Please check your settings or data... like data matches schema");
+ throw Throwables.propagate(ex);
+ }
+ log.info(String.format("Job executed. job id:[%s] file:[%s]", jobRef.getJobId(), localFilePath));
+ if (isSkipJobResultCheck) {
+ log.info(String.format("Skip job status check. job id:[%s]", jobRef.getJobId()));
+ } else {
+ getJobStatusUntilDone(jobRef);
+ }
}
- private TableReference getTableReference()
+ private TableReference createTableReference()
{
return new TableReference()
.setProjectId(project)
.setDatasetId(dataset)
.setTableId(table);
}
- private TableSchema getTableSchema()
+ private TableSchema createTableSchema(Optional<String> schemaPath) throws FileNotFoundException, IOException
{
- TableSchema tableSchema = new TableSchema();
- List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
- TableFieldSchema tableField;
- // TODO import from json file
- /*
- for () {
- tableField = new TableFieldSchema()
- .setName(name)
- .setType(type);
- fields.add(tableField);
+ String path = schemaPath.orNull();
+ File file = new File(path);
+ FileInputStream stream = null;
+ try {
+ stream = new FileInputStream(file);
+ ObjectMapper mapper = new ObjectMapper();
+ List<TableFieldSchema> fields = mapper.readValue(stream, new TypeReference<List<TableFieldSchema>>() {});
+ TableSchema tableSchema = new TableSchema().setFields(fields);
+ return tableSchema;
+ } finally {
+ if (stream != null) {
+ stream.close();
+ }
}
- */
-
- tableSchema.setFields(fields);
- return tableSchema;
}
- private String getRemotePath(String remotePath, String fileName)
+ public boolean isExistTable(String tableName) throws IOException
{
- String[] pathList = StringUtils.split(remotePath, '/');
- String path;
- if (remotePath.isEmpty()) {
- path = bucket + "/" + fileName;
- } else {
- path = bucket + "/" + StringUtils.join(pathList) + "/" + fileName;
+ Tables tableRequest = bigQueryClient.tables();
+ try {
+ Table tableData = tableRequest.get(project, dataset, tableName).execute();
+ } catch (GoogleJsonResponseException ex) {
+ return false;
}
- return path;
+ return true;
}
- public void addTask(Optional<String> remotePath, String fileName, long fileSize)
+ public void checkConfig() throws FileNotFoundException, IOException
{
- writerTask.addTaskFile(remotePath, fileName, fileSize);
- }
-
- public ArrayList<HashMap<String, String>> getFileList()
- {
- return writerTask.getFileList();
- }
-
- private class EmbulkBigqueryJob implements Callable<Void>
- {
- private final Job job;
-
- public EmbulkBigqueryJob(Job job)
- {
- this.job = job;
- }
-
- public Void call() throws IOException, TimeoutException, JobFailedException
- {
- Insert insert = bigQueryClient.jobs().insert(project, job);
- insert.setProjectId(project);
- JobReference jobRef = insert.execute().getJobReference();
- log.info(String.format("Job executed. job id:[%s]", jobRef.getJobId()));
- if (isSkipJobResultCheck) {
- log.info(String.format("Skip job status check. job id:[%s]", jobRef.getJobId()));
+ if (autoCreateTable) {
+ if (!schemaPath.isPresent()) {
+ throw new IOException("schema_path is empty");
} else {
- getJobStatusUntilDone(jobRef);
+ File file = new File(schemaPath.orNull());
+ if (!file.exists()) {
+ throw new FileNotFoundException("Can not load schema file.");
+ }
}
- return null;
+ } else {
+ if (!isExistTable(table)) {
+ throw new IOException(String.format("table [%s] is not exists", table));
+ }
}
}
- private class EmbulkBigqueryTask
+ private class UploadProgressListener implements MediaHttpUploaderProgressListener
{
- // https://cloud.google.com/bigquery/loading-data-into-bigquery#quota
- private final long MAX_SIZE_PER_LOAD_JOB = 1000 * 1024 * 1024 * 1024L; // 1TB
- private final int MAX_NUMBER_OF_FILES_PER_LOAD_JOB = 10000;
+ private String fileName;
- private final ArrayList<HashMap<String, String>> taskList = new ArrayList<HashMap<String, String>>();
- private final ArrayList<ArrayList<HashMap<String, String>>> jobList = new ArrayList<ArrayList<HashMap<String, String>>>();
-
- public void addTaskFile(Optional<String> remotePath, String fileName, long fileSize)
+ @Override
+ public void progressChanged(MediaHttpUploader uploader) throws IOException
{
- HashMap<String, String> task = new HashMap<String, String>();
- if (remotePath.isPresent()) {
- task.put("remote_path", remotePath.get());
- } else {
- task.put("remote_path", "");
+ switch (uploader.getUploadState()) {
+ case INITIATION_STARTED:
+ log.info(String.format("Upload start [%s]", fileName));
+ break;
+ case INITIATION_COMPLETE:
+ //log.info(String.format("Upload initiation completed file [%s]", fileName));
+ break;
+ case MEDIA_IN_PROGRESS:
+ log.debug(String.format("Uploading [%s] progress %3.0f", fileName, uploader.getProgress() * 100) + "%");
+ break;
+ case MEDIA_COMPLETE:
+ log.info(String.format("Upload completed [%s]", fileName));
}
- task.put("file_name", fileName);
- task.put("file_size", String.valueOf(fileSize));
- taskList.add(task);
}
- public ArrayList<ArrayList<HashMap<String, String>>> createJobList()
+ public void setFileName(String fileName)
{
- long currentBundleSize = 0;
- int currentFileCount = 0;
- ArrayList<HashMap<String, String>> job = new ArrayList<HashMap<String, String>>();
- for (HashMap<String, String> task : taskList) {
- boolean isNeedNextJobList = false;
- long fileSize = Long.valueOf(task.get("file_size")).longValue();
-
- if (currentBundleSize + fileSize > MAX_SIZE_PER_LOAD_JOB) {
- isNeedNextJobList = true;
- }
-
- if (currentFileCount >= MAX_NUMBER_OF_FILES_PER_LOAD_JOB) {
- isNeedNextJobList = true;
- }
-
- if (isNeedNextJobList) {
- jobList.add(job);
- job = new ArrayList<HashMap<String, String>>();
- job.add(task);
- currentBundleSize = 0;
- } else {
- job.add(task);
- }
- currentBundleSize += fileSize;
- currentFileCount++;
-
- log.debug(String.format("currentBundleSize:%s currentFileCount:%s", currentBundleSize, currentFileCount));
- log.debug(String.format("fileSize:%s, MAX_SIZE_PER_LOAD_JOB:%s MAX_NUMBER_OF_FILES_PER_LOAD_JOB:%s",
- fileSize, MAX_SIZE_PER_LOAD_JOB, MAX_NUMBER_OF_FILES_PER_LOAD_JOB));
-
- }
- if (job.size() > 0) {
- jobList.add(job);
- }
- return jobList;
+ this.fileName = fileName;
}
-
- public ArrayList<HashMap<String, String>> getFileList()
- {
- return taskList;
- }
}
public static class Builder
{
private final String serviceAccountEmail;
@@ -317,14 +302,14 @@
private String project;
private String dataset;
private String table;
private boolean autoCreateTable;
private Optional<String> schemaPath;
- private String bucket;
private String sourceFormat;
private String fieldDelimiter;
private int maxBadrecords;
+ private String encoding;
private int jobStatusMaxPollingTime;
private int jobStatusPollingInterval;
private boolean isSkipJobResultCheck;
@@ -373,16 +358,10 @@
{
this.schemaPath = schemaPath;
return this;
}
- public Builder setBucket(String bucket)
- {
- this.bucket = bucket;
- return this;
- }
-
public Builder setSourceFormat(String sourceFormat)
{
this.sourceFormat = sourceFormat;
return this;
}
@@ -394,9 +373,15 @@
}
public Builder setMaxBadrecords(int maxBadrecords)
{
this.maxBadrecords = maxBadrecords;
+ return this;
+ }
+
+ public Builder setEncoding(String encoding)
+ {
+ this.encoding = encoding;
return this;
}
public Builder setJobStatusMaxPollingTime(int jobStatusMaxPollingTime)
{
\ No newline at end of file