src/main/java/org/embulk/executor/mapreduce/EmbulkMapReduce.java in embulk-executor-mapreduce-0.1.3 vs src/main/java/org/embulk/executor/mapreduce/EmbulkMapReduce.java in embulk-executor-mapreduce-0.1.4

- old
+ new

@@ -3,10 +3,12 @@ import java.util.List; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.io.File; +import java.io.InputStream; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.EOFException; import java.io.InterruptedIOException; import java.lang.reflect.InvocationTargetException; import com.google.inject.Injector; @@ -46,10 +48,11 @@ import org.embulk.spi.util.RetryExecutor.Retryable; import org.embulk.spi.util.RetryExecutor.RetryGiveupException; import org.embulk.EmbulkService; import org.slf4j.Logger; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.embulk.spi.util.RetryExecutor.retryExecutor; public class EmbulkMapReduce { private static final String SYSTEM_CONFIG_SERVICE_CLASS = "mapreduce_service_class"; @@ -69,12 +72,14 @@ public static ConfigSource getSystemConfig(Configuration config) { try { ModelManager bootstrapModelManager = new ModelManager(null, new ObjectMapper()); - return new ConfigLoader(bootstrapModelManager).fromJson( - new JsonFactory().createParser(config.get(CK_SYSTEM_CONFIG))); // TODO add fromJson(String) + String json = config.get(CK_SYSTEM_CONFIG); + try (InputStream in = new ByteArrayInputStream(json.getBytes(UTF_8))) { + return new ConfigLoader(bootstrapModelManager).fromJson(in); + } } catch (IOException e) { throw Throwables.propagate(e); } } @@ -322,11 +327,16 @@ } } private static <T> T hadoopOperationWithRetry(final String message, final Callable<T> callable) throws IOException { - final Logger log = Exec.getLogger(EmbulkMapReduce.class); + return hadoopOperationWithRetry(Exec.getLogger(EmbulkMapReduce.class), message, callable); + } + + private static <T> T hadoopOperationWithRetry(final Logger log, + final String message, final Callable<T> callable) throws IOException + { try { return retryExecutor() .withRetryLimit(5) .withInitialRetryWait(2 * 1000) .withMaxRetryWait(20 * 1000) @@ -377,11 +387,11 @@ { this.config = context.getConfiguration(); this.injector = newEmbulkInstance(context.getConfiguration()); this.modelManager = injector.getInstance(ModelManager.class); this.task = getExecutorTask(injector, context.getConfiguration()); - this.session = new ExecSession(injector, task.getExecConfig()); + this.session = ExecSession.builder(injector).fromExecConfig(task.getExecConfig()).build(); try { LocalDirAllocator localDirAllocator = new LocalDirAllocator(MRConfig.LOCAL_DIR); Path destPath = localDirAllocator.getLocalPathForWrite("gems", config); this.localGemPath = new File(destPath.toString()); @@ -503,14 +513,21 @@ { private Context context; private SessionRunner runner; @Override - public void setup(Context context) throws IOException + public void setup(Context context) throws IOException, InterruptedException { this.context = context; this.runner = new SessionRunner(context); - runner.readPluginArchive().restoreLoadPathsTo(runner.getScriptingContainer()); + + runner.execSession(new ExecAction<Void>() { // for Exec.getLogger + public Void run() throws IOException + { + runner.readPluginArchive().restoreLoadPathsTo(runner.getScriptingContainer()); + return null; + } + }); } @Override public void map(IntWritable key, NullWritable value, final Context context) throws IOException, InterruptedException {