src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.1.11 vs src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.2.0
- old
+ new
@@ -9,10 +9,12 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.concurrent.TimeoutException;
+
+import com.google.api.services.bigquery.model.*;
import com.google.common.base.Optional;
import java.security.GeneralSecurityException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -22,20 +24,10 @@
import org.slf4j.Logger;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Tables;
import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfiguration;
-import com.google.api.services.bigquery.model.JobConfigurationLoad;
-import com.google.api.services.bigquery.model.JobStatistics;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.media.MediaHttpUploader;
import com.google.api.client.googleapis.media.MediaHttpUploaderProgressListener;
public class BigqueryWriter
@@ -188,10 +180,48 @@
} else {
getJobStatusUntilDone(project, jobRef);
}
}
+ public void replaceTable(String project, String dataset, String oldTable, String newTable)
+ throws TimeoutException, JobFailedException, IOException
+ {
+ copyTable(project, dataset, newTable, oldTable, false);
+ }
+
+ public void copyTable(String project, String dataset, String fromTable, String toTable, boolean append)
+ throws TimeoutException, JobFailedException, IOException
+ {
+ log.info(String.format("Copy Job preparing... project:%s dataset:%s from:%s to:%s", project, dataset, fromTable, toTable));
+
+ Job job = new Job();
+ JobReference jobRef = null;
+ JobConfiguration jobConfig = new JobConfiguration().setCopy(setCopyConfig(project, dataset, fromTable, toTable, append));
+ job.setConfiguration(jobConfig);
+ Insert insert = bigQueryClient.jobs().insert(project, job);
+ insert.setProjectId(project);
+ insert.setDisableGZipContent(true);
+
+ try {
+ jobRef = insert.execute().getJobReference();
+ } catch (IllegalStateException ex) {
+ throw new JobFailedException(ex.getMessage());
+ }
+ log.info(String.format("Job executed. job id:[%s]", jobRef.getJobId()));
+ getJobStatusUntilDone(project, jobRef);
+ }
+
+ public void deleteTable(String project, String dataset, String table) throws IOException {
+ try {
+ Tables.Delete delete = bigQueryClient.tables().delete(project, dataset, table);
+ delete.execute();
+ log.info(String.format("Table deleted. project:%s dataset:%s table:%s", delete.getProjectId(), delete.getDatasetId(), delete.getTableId()));
+ } catch (GoogleJsonResponseException ex) {
+ log.warn(ex.getMessage());
+ }
+ }
+
private JobConfigurationLoad setLoadConfig(String project, String dataset, String table)
{
JobConfigurationLoad config = new JobConfigurationLoad();
config.setAllowQuotedNewlines(allowQuotedNewlines)
.setEncoding(encoding)
@@ -209,9 +239,24 @@
config.setCreateDisposition("CREATE_IF_NEEDED");
log.info(String.format("table:[%s] will be create if not exists", table));
} else {
config.setCreateDisposition("CREATE_NEVER");
}
+ return config;
+ }
+
+ private JobConfigurationTableCopy setCopyConfig(String project, String dataset, String fromTable, String toTable, boolean append)
+ {
+ JobConfigurationTableCopy config = new JobConfigurationTableCopy();
+ config.setSourceTable(createTableReference(project, dataset, fromTable))
+ .setDestinationTable(createTableReference(project, dataset, toTable));
+
+ if (append) {
+ config.setWriteDisposition("WRITE_APPEND");
+ } else {
+ config.setWriteDisposition("WRITE_TRUNCATE");
+ }
+
return config;
}
private String createJobId(ImmutableList<String> elements) throws NoSuchAlgorithmException, IOException
{