src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.1.3 vs src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.1.4

- old
+ new

@@ -4,18 +4,22 @@ import java.io.IOException; import java.io.FileNotFoundException; import java.io.FileInputStream; import java.io.BufferedInputStream; import com.google.api.client.http.InputStreamContent; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.List; import java.util.concurrent.TimeoutException; import com.google.common.base.Optional; +import com.google.api.client.util.Base64; import com.google.common.base.Throwables; import java.security.GeneralSecurityException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.commons.codec.binary.Hex; import org.embulk.spi.Exec; import org.slf4j.Logger; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.Bigquery.Tables; @@ -46,10 +50,11 @@ private final TableSchema tableSchema; private final String sourceFormat; private final String fieldDelimiter; private final int maxBadrecords; private final String encoding; + private final boolean preventDuplicateInsert; private final long jobStatusMaxPollingTime; private final long jobStatusPollingInterval; private final boolean isSkipJobResultCheck; private final Bigquery bigQueryClient; @@ -62,10 +67,11 @@ this.schemaPath = builder.schemaPath; this.sourceFormat = builder.sourceFormat.toUpperCase(); this.fieldDelimiter = builder.fieldDelimiter; this.maxBadrecords = builder.maxBadrecords; this.encoding = builder.encoding.toUpperCase(); + this.preventDuplicateInsert = builder.preventDuplicateInsert; this.jobStatusMaxPollingTime = builder.jobStatusMaxPollingTime; this.jobStatusPollingInterval = builder.jobStatusPollingInterval; this.isSkipJobResultCheck = builder.isSkipJobResultCheck; BigqueryAuthentication auth = new BigqueryAuthentication(builder.authMethod, builder.serviceAccountEmail, builder.p12KeyFilePath, builder.applicationName); @@ -89,11 +95,11 @@ throw new JobFailedException(String.format("Job failed. job id:[%s] reason:[%s][%s] status:[FAILED]", jobRef.getJobId(), fatalError.getReason(), fatalError.getMessage())); } List<ErrorProto> errors = job.getStatus().getErrors(); if (errors != null) { for (ErrorProto error : errors) { - log.warn(String.format("Error: job id:[%s] reason[%s][%s] location:[%s]", jobRef.getJobId(), error.getReason(), error.getMessage(), error.getLocation())); + log.error(String.format("Error: job id:[%s] reason[%s][%s] location:[%s]", jobRef.getJobId(), error.getReason(), error.getMessage(), error.getLocation())); } } String jobStatus = job.getStatus().getState(); if (jobStatus.equals("DONE")) { @@ -130,21 +136,28 @@ } catch (InterruptedException ex) { log.warn(ex.getMessage()); } } - public void executeLoad(String localFilePath) throws GoogleJsonResponseException, IOException, TimeoutException, JobFailedException + public void executeLoad(String localFilePath) throws GoogleJsonResponseException, NoSuchAlgorithmException, + TimeoutException, JobFailedException, IOException { log.info(String.format("Job preparing... project:%s dataset:%s table:%s", project, dataset, table)); Job job = new Job(); - JobReference jobRef = null; + JobReference jobRef = new JobReference(); JobConfiguration jobConfig = new JobConfiguration(); JobConfigurationLoad loadConfig = new JobConfigurationLoad(); jobConfig.setLoad(loadConfig); job.setConfiguration(jobConfig); + if (preventDuplicateInsert) { + String jobId = createJobId(localFilePath); + jobRef.setJobId(jobId); + job.setJobReference(jobRef); + } + loadConfig.setAllowQuotedNewlines(false); loadConfig.setEncoding(encoding); loadConfig.setMaxBadRecords(maxBadrecords); if (sourceFormat.equals("NEWLINE_DELIMITED_JSON")) { loadConfig.setSourceFormat("NEWLINE_DELIMITED_JSON"); @@ -179,22 +192,40 @@ .setProgressListener(listner) .setDirectUploadEnabled(false); try { jobRef = insert.execute().getJobReference(); - } catch (Exception ex) { - log.warn("Job execution was failed. Please check your settings or data... like data matches schema"); - throw Throwables.propagate(ex); + } catch (IllegalStateException ex) { + throw new JobFailedException(ex.getMessage()); } log.info(String.format("Job executed. job id:[%s] file:[%s]", jobRef.getJobId(), localFilePath)); if (isSkipJobResultCheck) { log.info(String.format("Skip job status check. job id:[%s]", jobRef.getJobId())); } else { getJobStatusUntilDone(jobRef); } } + private String createJobId(String localFilePath) throws NoSuchAlgorithmException, IOException + { + StringBuilder sb = new StringBuilder(); + sb.append(getLocalMd5hash(localFilePath)); + sb.append(dataset); + sb.append(table); + sb.append(tableSchema); + sb.append(sourceFormat); + sb.append(fieldDelimiter); + sb.append(maxBadrecords); + sb.append(encoding); + + MessageDigest md = MessageDigest.getInstance("MD5"); + String str = new String(sb); + byte[] digest = md.digest(str.getBytes()); + String hash = new String(Hex.encodeHex(digest)); + return "embulk_job_" + hash; + } + private TableReference createTableReference() { return new TableReference() .setProjectId(project) .setDatasetId(dataset) @@ -246,10 +277,32 @@ throw new IOException(String.format("table [%s] is not exists", table)); } } } + private String getLocalMd5hash(String filePath) throws NoSuchAlgorithmException, IOException + { + FileInputStream stream = null; + try { + stream = new FileInputStream(filePath); + MessageDigest digest = MessageDigest.getInstance("MD5"); + + byte[] bytesBuffer = new byte[1024]; + int bytesRead = -1; + + while ((bytesRead = stream.read(bytesBuffer)) != -1) { + digest.update(bytesBuffer, 0, bytesRead); + } + byte[] hashedBytes = digest.digest(); + + byte[] encoded = (hashedBytes); + return new String(encoded); + } finally { + stream.close(); + } + } + private class UploadProgressListener implements MediaHttpUploaderProgressListener { private String fileName; @Override @@ -289,10 +342,11 @@ private Optional<String> schemaPath; private String sourceFormat; private String fieldDelimiter; private int maxBadrecords; private String encoding; + private boolean preventDuplicateInsert; private int jobStatusMaxPollingTime; private int jobStatusPollingInterval; private boolean isSkipJobResultCheck; public Builder(String authMethod) @@ -370,10 +424,16 @@ { this.encoding = encoding; return this; } + public Builder setPreventDuplicateInsert(boolean preventDuplicateInsert) + { + this.preventDuplicateInsert = preventDuplicateInsert; + return this; + } + public Builder setJobStatusMaxPollingTime(int jobStatusMaxPollingTime) { this.jobStatusMaxPollingTime = jobStatusMaxPollingTime; return this; } @@ -394,10 +454,10 @@ { return new BigqueryWriter(this); } } - public class JobFailedException extends Exception + public class JobFailedException extends RuntimeException { public JobFailedException(String message) { super(message); } } \ No newline at end of file