src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.2.0 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.2.1

- old
+ new

@@ -88,10 +88,14 @@ @Config("paths") @ConfigDefault("[]") List<String> getFiles(); void setFiles(List<String> files); + @Config("max_connection_retry") + @ConfigDefault("10") // 10 times retry to connect GCS server if failed. + int getMaxConnectionRetry(); + @ConfigInject BufferAllocator getBufferAllocator(); } private static final Logger log = Exec.getLogger(GcsFileInputPlugin.class); @@ -196,11 +200,11 @@ protected Storage newGcsClient(final PluginTask task, final GcsAuthentication auth) { Storage client = null; try { - client = auth.getGcsClient(task.getBucket()); + client = auth.getGcsClient(task.getBucket(), task.getMaxConnectionRetry()); } catch (IOException ex) { throw new ConfigException(ex); } @@ -301,24 +305,26 @@ { private final Logger log = Exec.getLogger(GcsInputStreamReopener.class); private final Storage client; private final String bucket; private final String key; + private final int maxConnectionRetry; - public GcsInputStreamReopener(Storage client, String bucket, String key) + public GcsInputStreamReopener(Storage client, String bucket, String key, int maxConnectionRetry) { this.client = client; this.bucket = bucket; this.key = key; + this.maxConnectionRetry = maxConnectionRetry; } @Override public InputStream reopen(final long offset, final Exception closedCause) throws IOException { try { return retryExecutor() - .withRetryLimit(3) + .withRetryLimit(maxConnectionRetry) .withInitialRetryWait(500) .withMaxRetryWait(30 * 1000) .runInterruptible(new Retryable<InputStream>() { @Override public InputStream call() throws InterruptedIOException, IOException @@ -393,17 +399,19 @@ implements InputStreamFileInput.Provider { private final Storage client; private final String bucket; private final String key; + private final int maxConnectionRetry; private boolean opened = false; public SingleFileProvider(PluginTask task, int taskIndex) { this.client = newGcsClient(task, newGcsAuth(task)); this.bucket = task.getBucket(); this.key = task.getFiles().get(taskIndex); + this.maxConnectionRetry = task.getMaxConnectionRetry(); } @Override public InputStream openNext() throws IOException { @@ -411,10 +419,10 @@ return null; } opened = true; Storage.Objects.Get getObject = client.objects().get(bucket, key); - return new ResumableInputStream(getObject.executeMediaAsInputStream(), new GcsInputStreamReopener(client, bucket, key)); + return new ResumableInputStream(getObject.executeMediaAsInputStream(), new GcsInputStreamReopener(client, bucket, key, maxConnectionRetry)); } @Override public void close() {