src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.1.3 vs src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.1.4
- old
+ new
@@ -4,18 +4,22 @@
import java.io.IOException;
import java.io.FileNotFoundException;
import java.io.FileInputStream;
import java.io.BufferedInputStream;
import com.google.api.client.http.InputStreamContent;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Optional;
+import com.google.api.client.util.Base64;
import com.google.common.base.Throwables;
import java.security.GeneralSecurityException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.codec.binary.Hex;
import org.embulk.spi.Exec;
import org.slf4j.Logger;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Tables;
@@ -46,10 +50,11 @@
private final TableSchema tableSchema;
private final String sourceFormat;
private final String fieldDelimiter;
private final int maxBadrecords;
private final String encoding;
+ private final boolean preventDuplicateInsert;
private final long jobStatusMaxPollingTime;
private final long jobStatusPollingInterval;
private final boolean isSkipJobResultCheck;
private final Bigquery bigQueryClient;
@@ -62,10 +67,11 @@
this.schemaPath = builder.schemaPath;
this.sourceFormat = builder.sourceFormat.toUpperCase();
this.fieldDelimiter = builder.fieldDelimiter;
this.maxBadrecords = builder.maxBadrecords;
this.encoding = builder.encoding.toUpperCase();
+ this.preventDuplicateInsert = builder.preventDuplicateInsert;
this.jobStatusMaxPollingTime = builder.jobStatusMaxPollingTime;
this.jobStatusPollingInterval = builder.jobStatusPollingInterval;
this.isSkipJobResultCheck = builder.isSkipJobResultCheck;
BigqueryAuthentication auth = new BigqueryAuthentication(builder.authMethod, builder.serviceAccountEmail, builder.p12KeyFilePath, builder.applicationName);
@@ -89,11 +95,11 @@
throw new JobFailedException(String.format("Job failed. job id:[%s] reason:[%s][%s] status:[FAILED]", jobRef.getJobId(), fatalError.getReason(), fatalError.getMessage()));
}
List<ErrorProto> errors = job.getStatus().getErrors();
if (errors != null) {
for (ErrorProto error : errors) {
- log.warn(String.format("Error: job id:[%s] reason[%s][%s] location:[%s]", jobRef.getJobId(), error.getReason(), error.getMessage(), error.getLocation()));
+ log.error(String.format("Error: job id:[%s] reason[%s][%s] location:[%s]", jobRef.getJobId(), error.getReason(), error.getMessage(), error.getLocation()));
}
}
String jobStatus = job.getStatus().getState();
if (jobStatus.equals("DONE")) {
@@ -130,21 +136,28 @@
} catch (InterruptedException ex) {
log.warn(ex.getMessage());
}
}
- public void executeLoad(String localFilePath) throws GoogleJsonResponseException, IOException, TimeoutException, JobFailedException
+ public void executeLoad(String localFilePath) throws GoogleJsonResponseException, NoSuchAlgorithmException,
+ TimeoutException, JobFailedException, IOException
{
log.info(String.format("Job preparing... project:%s dataset:%s table:%s", project, dataset, table));
Job job = new Job();
- JobReference jobRef = null;
+ JobReference jobRef = new JobReference();
JobConfiguration jobConfig = new JobConfiguration();
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
jobConfig.setLoad(loadConfig);
job.setConfiguration(jobConfig);
+ if (preventDuplicateInsert) {
+ String jobId = createJobId(localFilePath);
+ jobRef.setJobId(jobId);
+ job.setJobReference(jobRef);
+ }
+
loadConfig.setAllowQuotedNewlines(false);
loadConfig.setEncoding(encoding);
loadConfig.setMaxBadRecords(maxBadrecords);
if (sourceFormat.equals("NEWLINE_DELIMITED_JSON")) {
loadConfig.setSourceFormat("NEWLINE_DELIMITED_JSON");
@@ -179,22 +192,40 @@
.setProgressListener(listner)
.setDirectUploadEnabled(false);
try {
jobRef = insert.execute().getJobReference();
- } catch (Exception ex) {
- log.warn("Job execution was failed. Please check your settings or data... like data matches schema");
- throw Throwables.propagate(ex);
+ } catch (IllegalStateException ex) {
+ throw new JobFailedException(ex.getMessage());
}
log.info(String.format("Job executed. job id:[%s] file:[%s]", jobRef.getJobId(), localFilePath));
if (isSkipJobResultCheck) {
log.info(String.format("Skip job status check. job id:[%s]", jobRef.getJobId()));
} else {
getJobStatusUntilDone(jobRef);
}
}
+ private String createJobId(String localFilePath) throws NoSuchAlgorithmException, IOException
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(getLocalMd5hash(localFilePath));
+ sb.append(dataset);
+ sb.append(table);
+ sb.append(tableSchema);
+ sb.append(sourceFormat);
+ sb.append(fieldDelimiter);
+ sb.append(maxBadrecords);
+ sb.append(encoding);
+
+ MessageDigest md = MessageDigest.getInstance("MD5");
+ String str = new String(sb);
+ byte[] digest = md.digest(str.getBytes());
+ String hash = new String(Hex.encodeHex(digest));
+ return "embulk_job_" + hash;
+ }
+
private TableReference createTableReference()
{
return new TableReference()
.setProjectId(project)
.setDatasetId(dataset)
@@ -246,10 +277,32 @@
throw new IOException(String.format("table [%s] is not exists", table));
}
}
}
+ private String getLocalMd5hash(String filePath) throws NoSuchAlgorithmException, IOException
+ {
+ FileInputStream stream = null;
+ try {
+ stream = new FileInputStream(filePath);
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+
+ byte[] bytesBuffer = new byte[1024];
+ int bytesRead = -1;
+
+ while ((bytesRead = stream.read(bytesBuffer)) != -1) {
+ digest.update(bytesBuffer, 0, bytesRead);
+ }
+ byte[] hashedBytes = digest.digest();
+
+ byte[] encoded = (hashedBytes);
+ return new String(encoded);
+ } finally {
+ stream.close();
+ }
+ }
+
private class UploadProgressListener implements MediaHttpUploaderProgressListener
{
private String fileName;
@Override
@@ -289,10 +342,11 @@
private Optional<String> schemaPath;
private String sourceFormat;
private String fieldDelimiter;
private int maxBadrecords;
private String encoding;
+ private boolean preventDuplicateInsert;
private int jobStatusMaxPollingTime;
private int jobStatusPollingInterval;
private boolean isSkipJobResultCheck;
public Builder(String authMethod)
@@ -370,10 +424,16 @@
{
this.encoding = encoding;
return this;
}
+ public Builder setPreventDuplicateInsert(boolean preventDuplicateInsert)
+ {
+ this.preventDuplicateInsert = preventDuplicateInsert;
+ return this;
+ }
+
public Builder setJobStatusMaxPollingTime(int jobStatusMaxPollingTime)
{
this.jobStatusMaxPollingTime = jobStatusMaxPollingTime;
return this;
}
@@ -394,10 +454,10 @@
{
return new BigqueryWriter(this);
}
}
- public class JobFailedException extends Exception
+ public class JobFailedException extends RuntimeException
{
public JobFailedException(String message) {
super(message);
}
}
\ No newline at end of file