src/main/java/org/embulk/output/jdbc/RetryExecutor.java in embulk-output-jdbc-0.2.3 vs src/main/java/org/embulk/output/jdbc/RetryExecutor.java in embulk-output-jdbc-0.2.4
- old
+ new
@@ -1,105 +1,105 @@
-package org.embulk.output.jdbc;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-public class RetryExecutor
-{
- public static RetryExecutor retryExecutor()
- {
- return new RetryExecutor();
- }
-
- public static abstract class IdempotentOperation<T> implements Callable<T>
- {
- public abstract T call() throws Exception;
-
- public void onRetry(Throwable exception, int retryCount, int retryLimit, int retryWait)
- { }
-
- public void onGiveup(Throwable firstException, Throwable lastException)
- { }
-
- public abstract boolean isRetryableException(Throwable exception);
- }
-
- private int retryLimit = 3;
- private int initialRetryWait = 500;
- private int maxRetryWait = 30*60*1000;
-
- private RetryExecutor()
- { }
-
- public RetryExecutor setRetryLimit(int count)
- {
- this.retryLimit = count;
- return this;
- }
-
- public RetryExecutor setInitialRetryWait(int msec)
- {
- this.initialRetryWait = msec;
- return this;
- }
-
- public RetryExecutor setMaxRetryWait(int msec)
- {
- this.maxRetryWait = msec;
- return this;
- }
-
- public <T> T runInterruptible(IdempotentOperation<T> op) throws InterruptedException, ExecutionException
- {
- return run(op, true);
- }
-
- public <T> T run(IdempotentOperation<T> op) throws ExecutionException
- {
- try {
- return run(op, false);
- } catch (InterruptedException ex) {
- throw new ExecutionException("Unexpected interruption", ex);
- }
- }
-
- private <T> T run(IdempotentOperation<T> op, boolean interruptible)
- throws InterruptedException, ExecutionException
- {
- int retryWait = initialRetryWait;
- int retryCount = 0;
-
- Throwable firstException = null;
-
- while(true) {
- try {
- return op.call();
- } catch (Throwable exception) {
- if (firstException == null) {
- firstException = exception;
- }
- if (!op.isRetryableException(exception) || retryCount >= retryLimit) {
- op.onGiveup(firstException, exception);
- throw new ExecutionException(firstException);
- }
-
- retryCount++;
- op.onRetry(exception, retryCount, retryLimit, retryWait);
-
- try {
- Thread.sleep(retryWait);
- } catch (InterruptedException ex) {
- if (interruptible) {
- throw ex;
- }
- }
-
- retryWait *= 2; // exponential back-off
-
- if (retryWait > maxRetryWait) {
- retryWait = maxRetryWait;
- }
- }
- }
- }
-}
-
+package org.embulk.output.jdbc;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+public class RetryExecutor
+{
+ public static RetryExecutor retryExecutor()
+ {
+ return new RetryExecutor();
+ }
+
+ public static abstract class IdempotentOperation<T> implements Callable<T>
+ {
+ public abstract T call() throws Exception;
+
+ public void onRetry(Throwable exception, int retryCount, int retryLimit, int retryWait)
+ { }
+
+ public void onGiveup(Throwable firstException, Throwable lastException)
+ { }
+
+ public abstract boolean isRetryableException(Throwable exception);
+ }
+
+ private int retryLimit = 3;
+ private int initialRetryWait = 500;
+ private int maxRetryWait = 30*60*1000;
+
+ private RetryExecutor()
+ { }
+
+ public RetryExecutor setRetryLimit(int count)
+ {
+ this.retryLimit = count;
+ return this;
+ }
+
+ public RetryExecutor setInitialRetryWait(int msec)
+ {
+ this.initialRetryWait = msec;
+ return this;
+ }
+
+ public RetryExecutor setMaxRetryWait(int msec)
+ {
+ this.maxRetryWait = msec;
+ return this;
+ }
+
+ public <T> T runInterruptible(IdempotentOperation<T> op) throws InterruptedException, ExecutionException
+ {
+ return run(op, true);
+ }
+
+ public <T> T run(IdempotentOperation<T> op) throws ExecutionException
+ {
+ try {
+ return run(op, false);
+ } catch (InterruptedException ex) {
+ throw new ExecutionException("Unexpected interruption", ex);
+ }
+ }
+
+ private <T> T run(IdempotentOperation<T> op, boolean interruptible)
+ throws InterruptedException, ExecutionException
+ {
+ int retryWait = initialRetryWait;
+ int retryCount = 0;
+
+ Throwable firstException = null;
+
+ while(true) {
+ try {
+ return op.call();
+ } catch (Throwable exception) {
+ if (firstException == null) {
+ firstException = exception;
+ }
+ if (!op.isRetryableException(exception) || retryCount >= retryLimit) {
+ op.onGiveup(firstException, exception);
+ throw new ExecutionException(firstException);
+ }
+
+ retryCount++;
+ op.onRetry(exception, retryCount, retryLimit, retryWait);
+
+ try {
+ Thread.sleep(retryWait);
+ } catch (InterruptedException ex) {
+ if (interruptible) {
+ throw ex;
+ }
+ }
+
+ retryWait *= 2; // exponential back-off
+
+ if (retryWait > maxRetryWait) {
+ retryWait = maxRetryWait;
+ }
+ }
+ }
+ }
+}
+