src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.1.11 vs src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.2.0

- old
+ new

@@ -9,10 +9,12 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.List; import java.util.concurrent.TimeoutException; + +import com.google.api.services.bigquery.model.*; import com.google.common.base.Optional; import java.security.GeneralSecurityException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.core.type.TypeReference; @@ -22,20 +24,10 @@ import org.slf4j.Logger; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.Bigquery.Tables; import com.google.api.services.bigquery.Bigquery.Jobs.Insert; -import com.google.api.services.bigquery.model.Job; -import com.google.api.services.bigquery.model.JobConfiguration; -import com.google.api.services.bigquery.model.JobConfigurationLoad; -import com.google.api.services.bigquery.model.JobStatistics; -import com.google.api.services.bigquery.model.JobReference; -import com.google.api.services.bigquery.model.Table; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.googleapis.media.MediaHttpUploader; import com.google.api.client.googleapis.media.MediaHttpUploaderProgressListener; public class BigqueryWriter @@ -188,10 +180,48 @@ } else { getJobStatusUntilDone(project, jobRef); } } + public void replaceTable(String project, String dataset, String oldTable, String newTable) + throws TimeoutException, JobFailedException, IOException + { + copyTable(project, dataset, newTable, oldTable, false); + } + + public void copyTable(String project, String dataset, String fromTable, String toTable, boolean append) + throws TimeoutException, JobFailedException, IOException + { + log.info(String.format("Copy Job preparing... project:%s dataset:%s from:%s to:%s", project, dataset, fromTable, toTable)); + + Job job = new Job(); + JobReference jobRef = null; + JobConfiguration jobConfig = new JobConfiguration().setCopy(setCopyConfig(project, dataset, fromTable, toTable, append)); + job.setConfiguration(jobConfig); + Insert insert = bigQueryClient.jobs().insert(project, job); + insert.setProjectId(project); + insert.setDisableGZipContent(true); + + try { + jobRef = insert.execute().getJobReference(); + } catch (IllegalStateException ex) { + throw new JobFailedException(ex.getMessage()); + } + log.info(String.format("Job executed. job id:[%s]", jobRef.getJobId())); + getJobStatusUntilDone(project, jobRef); + } + + public void deleteTable(String project, String dataset, String table) throws IOException { + try { + Tables.Delete delete = bigQueryClient.tables().delete(project, dataset, table); + delete.execute(); + log.info(String.format("Table deleted. project:%s dataset:%s table:%s", delete.getProjectId(), delete.getDatasetId(), delete.getTableId())); + } catch (GoogleJsonResponseException ex) { + log.warn(ex.getMessage()); + } + } + private JobConfigurationLoad setLoadConfig(String project, String dataset, String table) { JobConfigurationLoad config = new JobConfigurationLoad(); config.setAllowQuotedNewlines(allowQuotedNewlines) .setEncoding(encoding) @@ -209,9 +239,24 @@ config.setCreateDisposition("CREATE_IF_NEEDED"); log.info(String.format("table:[%s] will be create if not exists", table)); } else { config.setCreateDisposition("CREATE_NEVER"); } + return config; + } + + private JobConfigurationTableCopy setCopyConfig(String project, String dataset, String fromTable, String toTable, boolean append) + { + JobConfigurationTableCopy config = new JobConfigurationTableCopy(); + config.setSourceTable(createTableReference(project, dataset, fromTable)) + .setDestinationTable(createTableReference(project, dataset, toTable)); + + if (append) { + config.setWriteDisposition("WRITE_APPEND"); + } else { + config.setWriteDisposition("WRITE_TRUNCATE"); + } + return config; } private String createJobId(ImmutableList<String> elements) throws NoSuchAlgorithmException, IOException {