src/main/java/org/embulk/output/multi/TransactionalPageOutputDelegate.java in embulk-output-multi-0.2.2 vs src/main/java/org/embulk/output/multi/TransactionalPageOutputDelegate.java in embulk-output-multi-0.3.0

- old
+ new

@@ -1,10 +1,11 @@ package org.embulk.output.multi; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.embulk.config.TaskReport; import org.embulk.spi.Page; +import org.embulk.spi.Schema; import org.embulk.spi.TransactionalPageOutput; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -18,24 +19,30 @@ private static final String THREAD_NAME_FORMAT = "multi-page-output-%s-%d"; private final OutputPluginDelegate source; private final TransactionalPageOutput delegate; private final BlockingQueue<Supplier<Object>> taskQueue; private final ExecutorService executorService; - private final Future<TaskReport> result; + private final Future<TaskReport> worker; + private boolean isFailed; - TransactionalPageOutputDelegate( + static TransactionalPageOutputDelegate open(Schema schema, int taskIndex, OutputPluginDelegate delegate) { + return new TransactionalPageOutputDelegate(taskIndex, delegate, delegate.open(schema, taskIndex)); + } + + private TransactionalPageOutputDelegate( int taskIndex, OutputPluginDelegate source, TransactionalPageOutput delegate ) { this.source = source; this.delegate = delegate; this.taskQueue = new LinkedBlockingQueue<>(); this.executorService = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setNameFormat(String.format(THREAD_NAME_FORMAT, source.getTag(), taskIndex)).build() ); - this.result = executorService.submit(new Worker()); + this.worker = executorService.submit(new Worker()); + this.isFailed = false; } void add(Page page) { taskQueue.add(() -> { delegate.add(page); @@ -56,23 +63,25 @@ return null; }); } void abort() { - taskQueue.add(() -> { + // Run abort only if the output failed. + if (isFailed) { delegate.abort(); - return null; - }); + } } TaskReport commit() { taskQueue.add(delegate::commit); try { - return result.get(); + return worker.get(); } catch (InterruptedException e) { + isFailed = true; Thread.currentThread().interrupt(); throw new RuntimeException(e); } catch (ExecutionException e) { + isFailed = true; throw new PluginExecutionException(source, e.getCause()); } finally { executorService.shutdown(); } }