src/main/java/org/embulk/executor/mapreduce/EmbulkMapReduce.java in embulk-executor-mapreduce-0.1.2 vs src/main/java/org/embulk/executor/mapreduce/EmbulkMapReduce.java in embulk-executor-mapreduce-0.1.3
- old
+ new
@@ -1,17 +1,20 @@
package org.embulk.executor.mapreduce;
-import java.io.EOFException;
-import java.io.InterruptedIOException;
import java.util.List;
import java.util.ArrayList;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.io.File;
import java.io.IOException;
+import java.io.EOFException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
import com.google.inject.Injector;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jruby.embed.ScriptingContainer;
import org.apache.hadoop.fs.Path;
@@ -22,10 +25,11 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.MRConfig;
@@ -46,15 +50,18 @@
import static org.embulk.spi.util.RetryExecutor.retryExecutor;
public class EmbulkMapReduce
{
+ private static final String SYSTEM_CONFIG_SERVICE_CLASS = "mapreduce_service_class";
+
private static final String CK_SYSTEM_CONFIG = "embulk.mapreduce.systemConfig";
private static final String CK_STATE_DIRECTORY_PATH = "embulk.mapreduce.stateDirectorypath";
private static final String CK_TASK_COUNT = "embulk.mapreduce.taskCount";
private static final String CK_TASK = "embulk.mapreduce.task";
private static final String CK_PLUGIN_ARCHIVE_SPECS = "embulk.mapreduce.pluginArchive.specs";
+
private static final String PLUGIN_ARCHIVE_FILE_NAME = "gems.zip";
public static void setSystemConfig(Configuration config, ModelManager modelManager, ConfigSource systemConfig)
{
config.set(CK_SYSTEM_CONFIG, modelManager.writeObject(systemConfig));
@@ -103,108 +110,258 @@
}
public static Injector newEmbulkInstance(Configuration config)
{
ConfigSource systemConfig = getSystemConfig(config);
- return new EmbulkService(systemConfig).getInjector();
+ String serviceClassName = systemConfig.get(String.class, SYSTEM_CONFIG_SERVICE_CLASS, "org.embulk.EmbulkService");
+
+ try {
+ Object obj;
+ if (serviceClassName.equals("org.embulk.EmbulkService")) {
+ obj = new EmbulkService(systemConfig);
+ } else {
+ Class<?> serviceClass = Class.forName(serviceClassName);
+ obj = serviceClass.getConstructor(ConfigSource.class).newInstance(systemConfig);
+ }
+
+ if (obj instanceof EmbulkService) {
+ return ((EmbulkService) obj).getInjector();
+ } else {
+ return (Injector) obj.getClass().getMethod("getInjector").invoke(obj);
+ }
+
+ } catch (InvocationTargetException ex) {
+ throw Throwables.propagate(ex.getCause());
+ } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | IllegalArgumentException ex) {
+ throw Throwables.propagate(ex);
+ }
}
- public static List<TaskAttemptID> listAttempts(Configuration config,
- Path stateDir) throws IOException
+ public static class JobStatus
{
- FileStatus[] stats = stateDir.getFileSystem(config).listStatus(stateDir);
- ImmutableList.Builder<TaskAttemptID> builder = ImmutableList.builder();
- for (FileStatus stat : stats) {
- if (stat.getPath().getName().startsWith("attempt_") && stat.isFile()) {
- String name = stat.getPath().getName();
- try {
- builder.add(TaskAttemptID.forName(name));
- } catch (IllegalArgumentException ex) {
- // ignore
+ private final boolean completed;
+ private final float mapProgress;
+ private final float reduceProgress;
+
+ public JobStatus(boolean completed, float mapProgress, float reduceProgress)
+ {
+ this.completed = completed;
+ this.mapProgress = mapProgress;
+ this.reduceProgress = reduceProgress;
+ }
+
+ public boolean isComplete()
+ {
+ return completed;
+ }
+
+ public float getMapProgress()
+ {
+ return mapProgress;
+ }
+
+ public float getReduceProgress()
+ {
+ return reduceProgress;
+ }
+ }
+
+ public static JobStatus getJobStatus(final Job job) throws IOException
+ {
+ return hadoopOperationWithRetry("getting job status", new Callable<JobStatus>() {
+ public JobStatus call() throws IOException
+ {
+ return new JobStatus(job.isComplete(), job.mapProgress(), job.reduceProgress());
+ }
+ });
+ }
+
+ public static Counters getJobCounters(final Job job) throws IOException
+ {
+ return hadoopOperationWithRetry("getting job counters", new Callable<Counters>() {
+ public Counters call() throws IOException
+ {
+ return job.getCounters();
+ }
+ });
+ }
+
+ public static List<TaskAttemptID> listAttempts(final Configuration config,
+ final Path stateDir) throws IOException
+ {
+ return hadoopOperationWithRetry("getting list of attempt state files on "+stateDir, new Callable<List<TaskAttemptID>>() {
+ public List<TaskAttemptID> call() throws IOException
+ {
+ FileStatus[] stats = stateDir.getFileSystem(config).listStatus(stateDir);
+ ImmutableList.Builder<TaskAttemptID> builder = ImmutableList.builder();
+ for (FileStatus stat : stats) {
+ if (stat.getPath().getName().startsWith("attempt_") && stat.isFile()) {
+ String name = stat.getPath().getName();
+ TaskAttemptID id;
+ try {
+ id = TaskAttemptID.forName(name);
+ } catch (Exception ex) {
+ // ignore this file
+ continue;
+ }
+ builder.add(id);
+ }
}
+ return builder.build();
}
- }
- return builder.build();
+ });
}
- public static PluginArchive readPluginArchive(File localDirectory, Configuration config,
- Path stateDir, ModelManager modelManager) throws IOException
+ public static void writePluginArchive(final Configuration config, final Path stateDir,
+ final PluginArchive archive, final ModelManager modelManager) throws IOException
{
- List<PluginArchive.GemSpec> specs = modelManager.readObject(
- new ArrayList<PluginArchive.GemSpec>() {}.getClass(),
- config.get(CK_PLUGIN_ARCHIVE_SPECS));
- Path path = new Path(stateDir, PLUGIN_ARCHIVE_FILE_NAME);
- try (FSDataInputStream in = path.getFileSystem(config).open(path)) {
- return PluginArchive.load(localDirectory, specs, in);
- }
+ final Path path = new Path(stateDir, PLUGIN_ARCHIVE_FILE_NAME);
+ hadoopOperationWithRetry("writing plugin archive to "+path, new Callable<Void>() {
+ public Void call() throws IOException
+ {
+ stateDir.getFileSystem(config).mkdirs(stateDir);
+ try (FSDataOutputStream out = path.getFileSystem(config).create(path, true)) {
+ List<PluginArchive.GemSpec> specs = archive.dump(out);
+ config.set(CK_PLUGIN_ARCHIVE_SPECS, modelManager.writeObject(specs));
+ }
+ return null;
+ }
+ });
}
- public static void writePluginArchive(Configuration config, Path stateDir,
- PluginArchive archive, ModelManager modelManager) throws IOException
+ public static PluginArchive readPluginArchive(final File localDirectory, final Configuration config,
+ Path stateDir, final ModelManager modelManager) throws IOException
{
- Path path = new Path(stateDir, PLUGIN_ARCHIVE_FILE_NAME);
- try (FSDataOutputStream out = path.getFileSystem(config).create(path, true)) {
- List<PluginArchive.GemSpec> specs = archive.dump(out);
- config.set(CK_PLUGIN_ARCHIVE_SPECS, modelManager.writeObject(specs));
- }
+ final Path path = new Path(stateDir, PLUGIN_ARCHIVE_FILE_NAME);
+ return hadoopOperationWithRetry("reading plugin archive file from "+path, new Callable<PluginArchive>() {
+ public PluginArchive call() throws IOException
+ {
+ List<PluginArchive.GemSpec> specs = modelManager.readObject(
+ new ArrayList<PluginArchive.GemSpec>() {}.getClass(),
+ config.get(CK_PLUGIN_ARCHIVE_SPECS));
+ try (FSDataInputStream in = path.getFileSystem(config).open(path)) {
+ return PluginArchive.load(localDirectory, specs, in);
+ }
+ }
+ });
}
+ public static void writeAttemptStateFile(final Configuration config,
+ Path stateDir, final AttemptState state, final ModelManager modelManager) throws IOException
+ {
+ final Path path = new Path(stateDir, state.getAttemptId().toString());
+ hadoopOperationWithRetry("writing attempt state file to "+path, new Callable<Void>() {
+ public Void call() throws IOException
+ {
+ try (FSDataOutputStream out = path.getFileSystem(config).create(path, true)) {
+ state.writeTo(out, modelManager);
+ }
+ return null;
+ }
+ });
+ }
+
public static AttemptState readAttemptStateFile(final Configuration config,
- Path stateDir, TaskAttemptID id, final ModelManager modelManager) throws IOException
+ Path stateDir, TaskAttemptID id, final ModelManager modelManager,
+ final boolean concurrentWriteIsPossible) throws IOException
{
final Logger log = Exec.getLogger(EmbulkMapReduce.class);
final Path path = new Path(stateDir, id.toString());
try {
return retryExecutor()
.withRetryLimit(5)
.withInitialRetryWait(2 * 1000)
.withMaxRetryWait(20 * 1000)
.runInterruptible(new Retryable<AttemptState>() {
@Override
- public AttemptState call() throws IOException {
+ public AttemptState call() throws IOException
+ {
try (FSDataInputStream in = path.getFileSystem(config).open(path)) {
return AttemptState.readFrom(in, modelManager);
}
}
@Override
- public boolean isRetryableException(Exception exception) {
- // AttemptState.readFrom throws 2 types of exceptions:
- // a) EOFException: race between readFrom and writeTo. See comments on AttemptState.readFrom.
- // b) IOException "Cannot obtain block length for LocatedBlock": HDFS-1058. See https://github.com/embulk/embulk-executor-mapreduce/pull/3
- // c) other IOException: FileSystem is not working
+ public boolean isRetryableException(Exception exception)
+ {
+ // AttemptState.readFrom throws 4 types of exceptions:
//
- // a) and b) are temporary problem which is not critical. c) could be temporary problem and it is critical.
- // Here retries regardless of the exception type because we can't distinguish b) from c).
+ // concurrentWriteIsPossible == true:
+ // a) EOFException: race between readFrom and writeTo. See comments on AttemptState.readFrom.
+ // b) EOFException: file exists but its format is invalid because this task is retried and last job/attempt left corrupted files (such as empty, partially written, etc)
+ // c) IOException "Cannot obtain block length for LocatedBlock": HDFS-1058. See https://github.com/embulk/embulk-executor-mapreduce/pull/3
+ // d) IOException: FileSystem is not working
+ // concurrentWriteIsPossible == false:
+ // e) EOFException: file exists but its format is invalid because this task is retried and last job/attempt left corrupted files (such as empty, partially written, etc)
+ // f) IOException: FileSystem is not working
+ //
+ if (exception instanceof EOFException && !concurrentWriteIsPossible) {
+ // e) is not recoverable.
+ return false;
+ }
return true;
}
@Override
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
- throws RetryGiveupException {
- log.warn("Retrying opening state file " + path.getName() + " error: " + exception);
+ throws RetryGiveupException
+ {
+ log.warn("Retrying opening state file {} ({}/{}) error: {}",
+ path, retryCount, retryLimit, exception);
}
@Override
public void onGiveup(Exception firstException, Exception lastException)
- throws RetryGiveupException {
- }
+ throws RetryGiveupException
+ { }
});
} catch (RetryGiveupException e) {
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);
throw Throwables.propagate(e.getCause());
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
- public static void writeAttemptStateFile(Configuration config,
- Path stateDir, AttemptState state, ModelManager modelManager) throws IOException
+ private static <T> T hadoopOperationWithRetry(final String message, final Callable<T> callable) throws IOException
{
- Path path = new Path(stateDir, state.getAttemptId().toString());
- // TODO retry file create and write
- try (FSDataOutputStream out = path.getFileSystem(config).create(path, true)) {
- state.writeTo(out, modelManager);
+ final Logger log = Exec.getLogger(EmbulkMapReduce.class);
+ try {
+ return retryExecutor()
+ .withRetryLimit(5)
+ .withInitialRetryWait(2 * 1000)
+ .withMaxRetryWait(20 * 1000)
+ .runInterruptible(new Retryable<T>() {
+ @Override
+ public T call() throws Exception
+ {
+ return callable.call();
+ }
+
+ @Override
+ public boolean isRetryableException(Exception exception)
+ {
+ return true;
+ }
+
+ @Override
+ public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
+ throws RetryGiveupException
+ {
+ log.warn("Retrying {} ({}/{}) error: {}",
+ message, retryCount, retryLimit, exception);
+ }
+
+ @Override
+ public void onGiveup(Exception firstException, Exception lastException)
+ throws RetryGiveupException
+ { }
+ });
+ } catch (RetryGiveupException e) {
+ Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);
+ throw Throwables.propagate(e.getCause());
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
}
}
public static class SessionRunner
{