src/main/java/org/embulk/output/multi/TransactionalPageOutputDelegate.java in embulk-output-multi-0.2.2 vs src/main/java/org/embulk/output/multi/TransactionalPageOutputDelegate.java in embulk-output-multi-0.3.0
- old
+ new
@@ -1,10 +1,11 @@
package org.embulk.output.multi;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.embulk.config.TaskReport;
import org.embulk.spi.Page;
+import org.embulk.spi.Schema;
import org.embulk.spi.TransactionalPageOutput;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -18,24 +19,30 @@
private static final String THREAD_NAME_FORMAT = "multi-page-output-%s-%d";
private final OutputPluginDelegate source;
private final TransactionalPageOutput delegate;
private final BlockingQueue<Supplier<Object>> taskQueue;
private final ExecutorService executorService;
- private final Future<TaskReport> result;
+ private final Future<TaskReport> worker;
+ private boolean isFailed;
- TransactionalPageOutputDelegate(
+ static TransactionalPageOutputDelegate open(Schema schema, int taskIndex, OutputPluginDelegate delegate) {
+ return new TransactionalPageOutputDelegate(taskIndex, delegate, delegate.open(schema, taskIndex));
+ }
+
+ private TransactionalPageOutputDelegate(
int taskIndex,
OutputPluginDelegate source,
TransactionalPageOutput delegate
) {
this.source = source;
this.delegate = delegate;
this.taskQueue = new LinkedBlockingQueue<>();
this.executorService = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(String.format(THREAD_NAME_FORMAT, source.getTag(), taskIndex)).build()
);
- this.result = executorService.submit(new Worker());
+ this.worker = executorService.submit(new Worker());
+ this.isFailed = false;
}
void add(Page page) {
taskQueue.add(() -> {
delegate.add(page);
@@ -56,23 +63,25 @@
return null;
});
}
void abort() {
- taskQueue.add(() -> {
+ // Run abort only if the output failed.
+ if (isFailed) {
delegate.abort();
- return null;
- });
+ }
}
TaskReport commit() {
taskQueue.add(delegate::commit);
try {
- return result.get();
+ return worker.get();
} catch (InterruptedException e) {
+ isFailed = true;
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
+ isFailed = true;
throw new PluginExecutionException(source, e.getCause());
} finally {
executorService.shutdown();
}
}