src/main/java/org/embulk/executor/mapreduce/EmbulkMapReduce.java in embulk-executor-mapreduce-0.1.3 vs src/main/java/org/embulk/executor/mapreduce/EmbulkMapReduce.java in embulk-executor-mapreduce-0.1.4
- old
+ new
@@ -3,10 +3,12 @@
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.io.File;
+import java.io.InputStream;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.EOFException;
import java.io.InterruptedIOException;
import java.lang.reflect.InvocationTargetException;
import com.google.inject.Injector;
@@ -46,10 +48,11 @@
import org.embulk.spi.util.RetryExecutor.Retryable;
import org.embulk.spi.util.RetryExecutor.RetryGiveupException;
import org.embulk.EmbulkService;
import org.slf4j.Logger;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.embulk.spi.util.RetryExecutor.retryExecutor;
public class EmbulkMapReduce
{
private static final String SYSTEM_CONFIG_SERVICE_CLASS = "mapreduce_service_class";
@@ -69,12 +72,14 @@
public static ConfigSource getSystemConfig(Configuration config)
{
try {
ModelManager bootstrapModelManager = new ModelManager(null, new ObjectMapper());
- return new ConfigLoader(bootstrapModelManager).fromJson(
- new JsonFactory().createParser(config.get(CK_SYSTEM_CONFIG))); // TODO add fromJson(String)
+ String json = config.get(CK_SYSTEM_CONFIG);
+ try (InputStream in = new ByteArrayInputStream(json.getBytes(UTF_8))) {
+ return new ConfigLoader(bootstrapModelManager).fromJson(in);
+ }
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
@@ -322,11 +327,16 @@
}
}
private static <T> T hadoopOperationWithRetry(final String message, final Callable<T> callable) throws IOException
{
- final Logger log = Exec.getLogger(EmbulkMapReduce.class);
+ return hadoopOperationWithRetry(Exec.getLogger(EmbulkMapReduce.class), message, callable);
+ }
+
+ private static <T> T hadoopOperationWithRetry(final Logger log,
+ final String message, final Callable<T> callable) throws IOException
+ {
try {
return retryExecutor()
.withRetryLimit(5)
.withInitialRetryWait(2 * 1000)
.withMaxRetryWait(20 * 1000)
@@ -377,11 +387,11 @@
{
this.config = context.getConfiguration();
this.injector = newEmbulkInstance(context.getConfiguration());
this.modelManager = injector.getInstance(ModelManager.class);
this.task = getExecutorTask(injector, context.getConfiguration());
- this.session = new ExecSession(injector, task.getExecConfig());
+ this.session = ExecSession.builder(injector).fromExecConfig(task.getExecConfig()).build();
try {
LocalDirAllocator localDirAllocator = new LocalDirAllocator(MRConfig.LOCAL_DIR);
Path destPath = localDirAllocator.getLocalPathForWrite("gems", config);
this.localGemPath = new File(destPath.toString());
@@ -503,14 +513,21 @@
{
private Context context;
private SessionRunner runner;
@Override
- public void setup(Context context) throws IOException
+ public void setup(Context context) throws IOException, InterruptedException
{
this.context = context;
this.runner = new SessionRunner(context);
- runner.readPluginArchive().restoreLoadPathsTo(runner.getScriptingContainer());
+
+ runner.execSession(new ExecAction<Void>() { // for Exec.getLogger
+ public Void run() throws IOException
+ {
+ runner.readPluginArchive().restoreLoadPathsTo(runner.getScriptingContainer());
+ return null;
+ }
+ });
}
@Override
public void map(IntWritable key, NullWritable value, final Context context) throws IOException, InterruptedException
{