src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.2.1 vs src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.2.2

- old
+ new

@@ -1,42 +1,54 @@ package org.embulk.output; -import java.io.File; -import java.io.IOException; -import java.io.FileNotFoundException; -import java.io.FileInputStream; -import java.io.BufferedInputStream; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.googleapis.media.MediaHttpUploader; +import com.google.api.client.googleapis.media.MediaHttpUploaderProgressListener; import com.google.api.client.http.InputStreamContent; - -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.List; -import java.util.concurrent.TimeoutException; - -import com.google.api.services.bigquery.model.*; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.Bigquery.Jobs.Insert; +import com.google.api.services.bigquery.Bigquery.Tables; +import com.google.api.services.bigquery.model.ErrorProto; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfiguration; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobConfigurationTableCopy; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; import com.google.common.base.Optional; -import java.security.GeneralSecurityException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.core.type.TypeReference; - import com.google.common.collect.ImmutableList; import org.apache.commons.codec.binary.Hex; import org.embulk.spi.Exec; import org.slf4j.Logger; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.Bigquery.Tables; -import com.google.api.services.bigquery.Bigquery.Jobs.Insert; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.googleapis.media.MediaHttpUploader; -import com.google.api.client.googleapis.media.MediaHttpUploaderProgressListener; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.concurrent.TimeoutException; + public class BigqueryWriter { private final Logger log = Exec.getLogger(BigqueryWriter.class); + private final String project; + private final String dataset; + private final String table; private final boolean autoCreateTable; private final Optional<String> schemaPath; + private final Optional<String> templateTable; private final TableSchema tableSchema; private final String sourceFormat; private final String fieldDelimiter; private final int maxBadRecords; private final String encoding; @@ -49,12 +61,16 @@ private final Bigquery bigQueryClient; public BigqueryWriter(Builder builder) throws IOException, GeneralSecurityException { + this.project = builder.project; + this.dataset = builder.dataset; + this.table = builder.table; this.autoCreateTable = builder.autoCreateTable; this.schemaPath = builder.schemaPath; + this.templateTable = builder.templateTable; this.sourceFormat = builder.sourceFormat.toUpperCase(); this.fieldDelimiter = builder.fieldDelimiter; this.maxBadRecords = builder.maxBadRecords; this.encoding = builder.encoding.toUpperCase(); this.preventDuplicateInsert = builder.preventDuplicateInsert; @@ -68,12 +84,19 @@ builder.authMethod, builder.serviceAccountEmail, builder.p12KeyFilePath, builder.jsonKeyFilePath, builder.applicationName ); this.bigQueryClient = auth.getBigqueryClient(); + checkConfig(); + if (autoCreateTable) { - this.tableSchema = createTableSchema(); + if (schemaPath.isPresent()) { + this.tableSchema = createTableSchema(); + } + else { + this.tableSchema = fetchTableSchema(); + } } else { this.tableSchema = null; } } @@ -312,10 +335,19 @@ stream.close(); } } } + public TableSchema fetchTableSchema() throws IOException + { + String fetchTarget = templateTable.orNull(); + log.info(String.format("Fetch table schema from project:%s dataset:%s table:%s", project, dataset, fetchTarget)); + Tables tableRequest = bigQueryClient.tables(); + Table tableData = tableRequest.get(project, dataset, fetchTarget).execute(); + return tableData.getSchema(); + } + public boolean isExistTable(String project, String dataset, String table) throws IOException { Tables tableRequest = bigQueryClient.tables(); try { Table tableData = tableRequest.get(project, dataset, table).execute(); @@ -324,22 +356,22 @@ return false; } return true; } - public void checkConfig(String project, String dataset, String table) throws IOException + public void checkConfig() throws IOException { if (autoCreateTable) { - if (!schemaPath.isPresent()) { - throw new FileNotFoundException("schema_file is empty"); - } - else { + if (schemaPath.isPresent()) { File file = new File(schemaPath.orNull()); if (!file.exists()) { throw new FileNotFoundException("Can not load schema file."); } } + else if (!templateTable.isPresent()) { + throw new FileNotFoundException("schema_file or template_table must be present"); + } } else { if (!isExistTable(project, dataset, table)) { throw new IOException(String.format("table [%s] is not exists", table)); } @@ -402,12 +434,16 @@ private final String authMethod; private Optional<String> serviceAccountEmail; private Optional<String> p12KeyFilePath; private Optional<String> jsonKeyFilePath; private String applicationName; + private String project; + private String dataset; + private String table; private boolean autoCreateTable; private Optional<String> schemaPath; + private Optional<String> templateTable; private String sourceFormat; private String fieldDelimiter; private int maxBadRecords; private String encoding; private boolean preventDuplicateInsert; @@ -425,18 +461,42 @@ this.p12KeyFilePath = p12KeyFilePath; this.jsonKeyFilePath = jsonKeyFilePath; this.applicationName = applicationName; } + public Builder setProject(String project) + { + this.project = project; + return this; + } + + public Builder setDataset(String dataset) + { + this.dataset = dataset; + return this; + } + + public Builder setTable(String table) + { + this.table = table; + return this; + } + public Builder setAutoCreateTable(boolean autoCreateTable) { this.autoCreateTable = autoCreateTable; return this; } public Builder setSchemaPath(Optional<String> schemaPath) { this.schemaPath = schemaPath; + return this; + } + + public Builder setTemplateTable(Optional<String> templateTable) + { + this.templateTable = templateTable; return this; } public Builder setSourceFormat(String sourceFormat) {