src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.2.0 vs src/main/java/org/embulk/output/BigqueryWriter.java in embulk-output-bigquery-0.2.1

- old
+ new

@@ -70,11 +70,12 @@ ); this.bigQueryClient = auth.getBigqueryClient(); if (autoCreateTable) { this.tableSchema = createTableSchema(); - } else { + } + else { this.tableSchema = null; } } private String getJobStatus(String project, JobReference jobRef) throws JobFailedException @@ -98,11 +99,12 @@ if (jobStatus.equals("DONE")) { JobStatistics statistics = job.getStatistics(); log.info(String.format("Job statistics [%s]", statistics.getLoad())); } return jobStatus; - } catch (IOException ex) { + } + catch (IOException ex) { log.warn(ex.getMessage()); return "UNKNOWN"; } } @@ -116,18 +118,21 @@ String jobStatus = getJobStatus(project, jobRef); elapsedTime = System.currentTimeMillis() - startTime; if (jobStatus.equals("DONE")) { log.info(String.format("Job completed successfully. job id:[%s] elapsed_time:%dms status:[%s]", jobRef.getJobId(), elapsedTime, "SUCCESS")); break; - } else if (elapsedTime > jobStatusMaxPollingTime * 1000) { + } + else if (elapsedTime > jobStatusMaxPollingTime * 1000) { throw new TimeoutException(String.format("Checking job status...Timeout. job id:[%s] elapsed_time:%dms status:[%s]", jobRef.getJobId(), elapsedTime, "TIMEOUT")); - } else { + } + else { log.info(String.format("Checking job status... job id:[%s] elapsed_time:%dms status:[%s]", jobRef.getJobId(), elapsedTime, jobStatus)); } Thread.sleep(jobStatusPollingInterval * 1000); } - } catch (InterruptedException ex) { + } + catch (InterruptedException ex) { log.warn(ex.getMessage()); } } public void executeLoad(String project, String dataset, String table, String localFilePath) @@ -169,17 +174,19 @@ .setProgressListener(listner) .setDirectUploadEnabled(false); try { jobRef = insert.execute().getJobReference(); - } catch (IllegalStateException ex) { + } + catch (IllegalStateException ex) { throw new JobFailedException(ex.getMessage()); } log.info(String.format("Job executed. job id:[%s] file:[%s]", jobRef.getJobId(), localFilePath)); if (isSkipJobResultCheck) { log.info(String.format("Skip job status check. job id:[%s]", jobRef.getJobId())); - } else { + } + else { getJobStatusUntilDone(project, jobRef); } } public void replaceTable(String project, String dataset, String oldTable, String newTable) @@ -201,23 +208,26 @@ insert.setProjectId(project); insert.setDisableGZipContent(true); try { jobRef = insert.execute().getJobReference(); - } catch (IllegalStateException ex) { + } + catch (IllegalStateException ex) { throw new JobFailedException(ex.getMessage()); } log.info(String.format("Job executed. job id:[%s]", jobRef.getJobId())); getJobStatusUntilDone(project, jobRef); } - public void deleteTable(String project, String dataset, String table) throws IOException { + public void deleteTable(String project, String dataset, String table) throws IOException + { try { Tables.Delete delete = bigQueryClient.tables().delete(project, dataset, table); delete.execute(); log.info(String.format("Table deleted. project:%s dataset:%s table:%s", delete.getProjectId(), delete.getDatasetId(), delete.getTableId())); - } catch (GoogleJsonResponseException ex) { + } + catch (GoogleJsonResponseException ex) { log.warn(ex.getMessage()); } } private JobConfigurationLoad setLoadConfig(String project, String dataset, String table) @@ -236,11 +246,12 @@ } if (autoCreateTable) { config.setSchema(tableSchema); config.setCreateDisposition("CREATE_IF_NEEDED"); log.info(String.format("table:[%s] will be create if not exists", table)); - } else { + } + else { config.setCreateDisposition("CREATE_NEVER"); } return config; } @@ -250,11 +261,12 @@ config.setSourceTable(createTableReference(project, dataset, fromTable)) .setDestinationTable(createTableReference(project, dataset, toTable)); if (append) { config.setWriteDisposition("WRITE_APPEND"); - } else { + } + else { config.setWriteDisposition("WRITE_TRUNCATE"); } return config; } @@ -292,11 +304,12 @@ try { stream = new FileInputStream(file); ObjectMapper mapper = new ObjectMapper(); List<TableFieldSchema> fields = mapper.readValue(stream, new TypeReference<List<TableFieldSchema>>() {}); return new TableSchema().setFields(fields); - } finally { + } + finally { if (stream != null) { stream.close(); } } } @@ -304,28 +317,31 @@ public boolean isExistTable(String project, String dataset, String table) throws IOException { Tables tableRequest = bigQueryClient.tables(); try { Table tableData = tableRequest.get(project, dataset, table).execute(); - } catch (GoogleJsonResponseException ex) { + } + catch (GoogleJsonResponseException ex) { return false; } return true; } public void checkConfig(String project, String dataset, String table) throws IOException { if (autoCreateTable) { if (!schemaPath.isPresent()) { throw new FileNotFoundException("schema_file is empty"); - } else { + } + else { File file = new File(schemaPath.orNull()); if (!file.exists()) { throw new FileNotFoundException("Can not load schema file."); } } - } else { + } + else { if (!isExistTable(project, dataset, table)) { throw new IOException(String.format("table [%s] is not exists", table)); } } } @@ -345,11 +361,12 @@ } byte[] hashedBytes = digest.digest(); byte[] encoded = (hashedBytes); return new String(encoded); - } finally { + } + finally { stream.close(); } } private class UploadProgressListener implements MediaHttpUploaderProgressListener @@ -488,10 +505,11 @@ } } public class JobFailedException extends RuntimeException { - public JobFailedException(String message) { + public JobFailedException(String message) + { super(message); } } }