src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.2.1 vs src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.2.2
- old
+ new
@@ -1,42 +1,54 @@
package org.embulk.output;
-import java.io.File;
-import java.io.IOException;
-import java.io.FileNotFoundException;
-import java.io.FileInputStream;
-import java.io.BufferedInputStream;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.media.MediaHttpUploader;
+import com.google.api.client.googleapis.media.MediaHttpUploaderProgressListener;
import com.google.api.client.http.InputStreamContent;
-
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-import com.google.api.services.bigquery.model.*;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
+import com.google.api.services.bigquery.Bigquery.Tables;
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfiguration;
+import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Optional;
-import java.security.GeneralSecurityException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-
import com.google.common.collect.ImmutableList;
import org.apache.commons.codec.binary.Hex;
import org.embulk.spi.Exec;
import org.slf4j.Logger;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.Bigquery.Tables;
-import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.googleapis.media.MediaHttpUploader;
-import com.google.api.client.googleapis.media.MediaHttpUploaderProgressListener;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
public class BigqueryWriter
{
private final Logger log = Exec.getLogger(BigqueryWriter.class);
+ private final String project;
+ private final String dataset;
+ private final String table;
private final boolean autoCreateTable;
private final Optional<String> schemaPath;
+ private final Optional<String> templateTable;
private final TableSchema tableSchema;
private final String sourceFormat;
private final String fieldDelimiter;
private final int maxBadRecords;
private final String encoding;
@@ -49,12 +61,16 @@
private final Bigquery bigQueryClient;
public BigqueryWriter(Builder builder)
throws IOException, GeneralSecurityException
{
+ this.project = builder.project;
+ this.dataset = builder.dataset;
+ this.table = builder.table;
this.autoCreateTable = builder.autoCreateTable;
this.schemaPath = builder.schemaPath;
+ this.templateTable = builder.templateTable;
this.sourceFormat = builder.sourceFormat.toUpperCase();
this.fieldDelimiter = builder.fieldDelimiter;
this.maxBadRecords = builder.maxBadRecords;
this.encoding = builder.encoding.toUpperCase();
this.preventDuplicateInsert = builder.preventDuplicateInsert;
@@ -68,12 +84,19 @@
builder.authMethod, builder.serviceAccountEmail, builder.p12KeyFilePath,
builder.jsonKeyFilePath, builder.applicationName
);
this.bigQueryClient = auth.getBigqueryClient();
+ checkConfig();
+
if (autoCreateTable) {
- this.tableSchema = createTableSchema();
+ if (schemaPath.isPresent()) {
+ this.tableSchema = createTableSchema();
+ }
+ else {
+ this.tableSchema = fetchTableSchema();
+ }
}
else {
this.tableSchema = null;
}
}
@@ -312,10 +335,19 @@
stream.close();
}
}
}
+ public TableSchema fetchTableSchema() throws IOException
+ {
+ String fetchTarget = templateTable.orNull();
+ log.info(String.format("Fetch table schema from project:%s dataset:%s table:%s", project, dataset, fetchTarget));
+ Tables tableRequest = bigQueryClient.tables();
+ Table tableData = tableRequest.get(project, dataset, fetchTarget).execute();
+ return tableData.getSchema();
+ }
+
public boolean isExistTable(String project, String dataset, String table) throws IOException
{
Tables tableRequest = bigQueryClient.tables();
try {
Table tableData = tableRequest.get(project, dataset, table).execute();
@@ -324,22 +356,22 @@
return false;
}
return true;
}
- public void checkConfig(String project, String dataset, String table) throws IOException
+ public void checkConfig() throws IOException
{
if (autoCreateTable) {
- if (!schemaPath.isPresent()) {
- throw new FileNotFoundException("schema_file is empty");
- }
- else {
+ if (schemaPath.isPresent()) {
File file = new File(schemaPath.orNull());
if (!file.exists()) {
throw new FileNotFoundException("Can not load schema file.");
}
}
+ else if (!templateTable.isPresent()) {
+ throw new FileNotFoundException("schema_file or template_table must be present");
+ }
}
else {
if (!isExistTable(project, dataset, table)) {
throw new IOException(String.format("table [%s] is not exists", table));
}
@@ -402,12 +434,16 @@
private final String authMethod;
private Optional<String> serviceAccountEmail;
private Optional<String> p12KeyFilePath;
private Optional<String> jsonKeyFilePath;
private String applicationName;
+ private String project;
+ private String dataset;
+ private String table;
private boolean autoCreateTable;
private Optional<String> schemaPath;
+ private Optional<String> templateTable;
private String sourceFormat;
private String fieldDelimiter;
private int maxBadRecords;
private String encoding;
private boolean preventDuplicateInsert;
@@ -425,18 +461,42 @@
this.p12KeyFilePath = p12KeyFilePath;
this.jsonKeyFilePath = jsonKeyFilePath;
this.applicationName = applicationName;
}
+ public Builder setProject(String project)
+ {
+ this.project = project;
+ return this;
+ }
+
+ public Builder setDataset(String dataset)
+ {
+ this.dataset = dataset;
+ return this;
+ }
+
+ public Builder setTable(String table)
+ {
+ this.table = table;
+ return this;
+ }
+
public Builder setAutoCreateTable(boolean autoCreateTable)
{
this.autoCreateTable = autoCreateTable;
return this;
}
public Builder setSchemaPath(Optional<String> schemaPath)
{
this.schemaPath = schemaPath;
+ return this;
+ }
+
+ public Builder setTemplateTable(Optional<String> templateTable)
+ {
+ this.templateTable = templateTable;
return this;
}
public Builder setSourceFormat(String sourceFormat)
{