src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.2.0 vs src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java in embulk-input-gcs-0.2.1
- old
+ new
@@ -88,10 +88,14 @@
@Config("paths")
@ConfigDefault("[]")
List<String> getFiles();
void setFiles(List<String> files);
+ @Config("max_connection_retry")
+ @ConfigDefault("10") // 10 times retry to connect GCS server if failed.
+ int getMaxConnectionRetry();
+
@ConfigInject
BufferAllocator getBufferAllocator();
}
private static final Logger log = Exec.getLogger(GcsFileInputPlugin.class);
@@ -196,11 +200,11 @@
protected Storage newGcsClient(final PluginTask task, final GcsAuthentication auth)
{
Storage client = null;
try {
- client = auth.getGcsClient(task.getBucket());
+ client = auth.getGcsClient(task.getBucket(), task.getMaxConnectionRetry());
}
catch (IOException ex) {
throw new ConfigException(ex);
}
@@ -301,24 +305,26 @@
{
private final Logger log = Exec.getLogger(GcsInputStreamReopener.class);
private final Storage client;
private final String bucket;
private final String key;
+ private final int maxConnectionRetry;
- public GcsInputStreamReopener(Storage client, String bucket, String key)
+ public GcsInputStreamReopener(Storage client, String bucket, String key, int maxConnectionRetry)
{
this.client = client;
this.bucket = bucket;
this.key = key;
+ this.maxConnectionRetry = maxConnectionRetry;
}
@Override
public InputStream reopen(final long offset, final Exception closedCause) throws IOException
{
try {
return retryExecutor()
- .withRetryLimit(3)
+ .withRetryLimit(maxConnectionRetry)
.withInitialRetryWait(500)
.withMaxRetryWait(30 * 1000)
.runInterruptible(new Retryable<InputStream>() {
@Override
public InputStream call() throws InterruptedIOException, IOException
@@ -393,17 +399,19 @@
implements InputStreamFileInput.Provider
{
private final Storage client;
private final String bucket;
private final String key;
+ private final int maxConnectionRetry;
private boolean opened = false;
public SingleFileProvider(PluginTask task, int taskIndex)
{
this.client = newGcsClient(task, newGcsAuth(task));
this.bucket = task.getBucket();
this.key = task.getFiles().get(taskIndex);
+ this.maxConnectionRetry = task.getMaxConnectionRetry();
}
@Override
public InputStream openNext() throws IOException
{
@@ -411,10 +419,10 @@
return null;
}
opened = true;
Storage.Objects.Get getObject = client.objects().get(bucket, key);
- return new ResumableInputStream(getObject.executeMediaAsInputStream(), new GcsInputStreamReopener(client, bucket, key));
+ return new ResumableInputStream(getObject.executeMediaAsInputStream(), new GcsInputStreamReopener(client, bucket, key, maxConnectionRetry));
}
@Override
public void close()
{