src/main/java/org/embulk/executor/mapreduce/EmbulkMapReduce.java in embulk-executor-mapreduce-0.1.2 vs src/main/java/org/embulk/executor/mapreduce/EmbulkMapReduce.java in embulk-executor-mapreduce-0.1.3

- old
+ new

@@ -1,17 +1,20 @@ package org.embulk.executor.mapreduce; -import java.io.EOFException; -import java.io.InterruptedIOException; import java.util.List; import java.util.ArrayList; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.io.File; import java.io.IOException; +import java.io.EOFException; +import java.io.InterruptedIOException; +import java.lang.reflect.InvocationTargetException; import com.google.inject.Injector; import com.google.common.base.Optional; import com.google.common.base.Throwables; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.ObjectMapper; import org.jruby.embed.ScriptingContainer; import org.apache.hadoop.fs.Path; @@ -22,10 +25,11 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.MRConfig; @@ -46,15 +50,18 @@ import static org.embulk.spi.util.RetryExecutor.retryExecutor; public class EmbulkMapReduce { + private static final String SYSTEM_CONFIG_SERVICE_CLASS = "mapreduce_service_class"; + private static final String CK_SYSTEM_CONFIG = "embulk.mapreduce.systemConfig"; private static final String CK_STATE_DIRECTORY_PATH = "embulk.mapreduce.stateDirectorypath"; private static final String CK_TASK_COUNT = "embulk.mapreduce.taskCount"; private static final String CK_TASK = "embulk.mapreduce.task"; private static final String CK_PLUGIN_ARCHIVE_SPECS = "embulk.mapreduce.pluginArchive.specs"; + private static final String PLUGIN_ARCHIVE_FILE_NAME = "gems.zip"; public static void setSystemConfig(Configuration config, ModelManager modelManager, ConfigSource systemConfig) { config.set(CK_SYSTEM_CONFIG, modelManager.writeObject(systemConfig)); @@ -103,108 +110,258 @@ } public static Injector newEmbulkInstance(Configuration config) { ConfigSource systemConfig = getSystemConfig(config); - return new EmbulkService(systemConfig).getInjector(); + String serviceClassName = systemConfig.get(String.class, SYSTEM_CONFIG_SERVICE_CLASS, "org.embulk.EmbulkService"); + + try { + Object obj; + if (serviceClassName.equals("org.embulk.EmbulkService")) { + obj = new EmbulkService(systemConfig); + } else { + Class<?> serviceClass = Class.forName(serviceClassName); + obj = serviceClass.getConstructor(ConfigSource.class).newInstance(systemConfig); + } + + if (obj instanceof EmbulkService) { + return ((EmbulkService) obj).getInjector(); + } else { + return (Injector) obj.getClass().getMethod("getInjector").invoke(obj); + } + + } catch (InvocationTargetException ex) { + throw Throwables.propagate(ex.getCause()); + } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | IllegalArgumentException ex) { + throw Throwables.propagate(ex); + } } - public static List<TaskAttemptID> listAttempts(Configuration config, - Path stateDir) throws IOException + public static class JobStatus { - FileStatus[] stats = stateDir.getFileSystem(config).listStatus(stateDir); - ImmutableList.Builder<TaskAttemptID> builder = ImmutableList.builder(); - for (FileStatus stat : stats) { - if (stat.getPath().getName().startsWith("attempt_") && stat.isFile()) { - String name = stat.getPath().getName(); - try { - builder.add(TaskAttemptID.forName(name)); - } catch (IllegalArgumentException ex) { - // ignore + private final boolean completed; + private final float mapProgress; + private final float reduceProgress; + + public JobStatus(boolean completed, float mapProgress, float reduceProgress) + { + this.completed = completed; + this.mapProgress = mapProgress; + this.reduceProgress = reduceProgress; + } + + public boolean isComplete() + { + return completed; + } + + public float getMapProgress() + { + return mapProgress; + } + + public float getReduceProgress() + { + return reduceProgress; + } + } + + public static JobStatus getJobStatus(final Job job) throws IOException + { + return hadoopOperationWithRetry("getting job status", new Callable<JobStatus>() { + public JobStatus call() throws IOException + { + return new JobStatus(job.isComplete(), job.mapProgress(), job.reduceProgress()); + } + }); + } + + public static Counters getJobCounters(final Job job) throws IOException + { + return hadoopOperationWithRetry("getting job counters", new Callable<Counters>() { + public Counters call() throws IOException + { + return job.getCounters(); + } + }); + } + + public static List<TaskAttemptID> listAttempts(final Configuration config, + final Path stateDir) throws IOException + { + return hadoopOperationWithRetry("getting list of attempt state files on "+stateDir, new Callable<List<TaskAttemptID>>() { + public List<TaskAttemptID> call() throws IOException + { + FileStatus[] stats = stateDir.getFileSystem(config).listStatus(stateDir); + ImmutableList.Builder<TaskAttemptID> builder = ImmutableList.builder(); + for (FileStatus stat : stats) { + if (stat.getPath().getName().startsWith("attempt_") && stat.isFile()) { + String name = stat.getPath().getName(); + TaskAttemptID id; + try { + id = TaskAttemptID.forName(name); + } catch (Exception ex) { + // ignore this file + continue; + } + builder.add(id); + } } + return builder.build(); } - } - return builder.build(); + }); } - public static PluginArchive readPluginArchive(File localDirectory, Configuration config, - Path stateDir, ModelManager modelManager) throws IOException + public static void writePluginArchive(final Configuration config, final Path stateDir, + final PluginArchive archive, final ModelManager modelManager) throws IOException { - List<PluginArchive.GemSpec> specs = modelManager.readObject( - new ArrayList<PluginArchive.GemSpec>() {}.getClass(), - config.get(CK_PLUGIN_ARCHIVE_SPECS)); - Path path = new Path(stateDir, PLUGIN_ARCHIVE_FILE_NAME); - try (FSDataInputStream in = path.getFileSystem(config).open(path)) { - return PluginArchive.load(localDirectory, specs, in); - } + final Path path = new Path(stateDir, PLUGIN_ARCHIVE_FILE_NAME); + hadoopOperationWithRetry("writing plugin archive to "+path, new Callable<Void>() { + public Void call() throws IOException + { + stateDir.getFileSystem(config).mkdirs(stateDir); + try (FSDataOutputStream out = path.getFileSystem(config).create(path, true)) { + List<PluginArchive.GemSpec> specs = archive.dump(out); + config.set(CK_PLUGIN_ARCHIVE_SPECS, modelManager.writeObject(specs)); + } + return null; + } + }); } - public static void writePluginArchive(Configuration config, Path stateDir, - PluginArchive archive, ModelManager modelManager) throws IOException + public static PluginArchive readPluginArchive(final File localDirectory, final Configuration config, + Path stateDir, final ModelManager modelManager) throws IOException { - Path path = new Path(stateDir, PLUGIN_ARCHIVE_FILE_NAME); - try (FSDataOutputStream out = path.getFileSystem(config).create(path, true)) { - List<PluginArchive.GemSpec> specs = archive.dump(out); - config.set(CK_PLUGIN_ARCHIVE_SPECS, modelManager.writeObject(specs)); - } + final Path path = new Path(stateDir, PLUGIN_ARCHIVE_FILE_NAME); + return hadoopOperationWithRetry("reading plugin archive file from "+path, new Callable<PluginArchive>() { + public PluginArchive call() throws IOException + { + List<PluginArchive.GemSpec> specs = modelManager.readObject( + new ArrayList<PluginArchive.GemSpec>() {}.getClass(), + config.get(CK_PLUGIN_ARCHIVE_SPECS)); + try (FSDataInputStream in = path.getFileSystem(config).open(path)) { + return PluginArchive.load(localDirectory, specs, in); + } + } + }); } + public static void writeAttemptStateFile(final Configuration config, + Path stateDir, final AttemptState state, final ModelManager modelManager) throws IOException + { + final Path path = new Path(stateDir, state.getAttemptId().toString()); + hadoopOperationWithRetry("writing attempt state file to "+path, new Callable<Void>() { + public Void call() throws IOException + { + try (FSDataOutputStream out = path.getFileSystem(config).create(path, true)) { + state.writeTo(out, modelManager); + } + return null; + } + }); + } + public static AttemptState readAttemptStateFile(final Configuration config, - Path stateDir, TaskAttemptID id, final ModelManager modelManager) throws IOException + Path stateDir, TaskAttemptID id, final ModelManager modelManager, + final boolean concurrentWriteIsPossible) throws IOException { final Logger log = Exec.getLogger(EmbulkMapReduce.class); final Path path = new Path(stateDir, id.toString()); try { return retryExecutor() .withRetryLimit(5) .withInitialRetryWait(2 * 1000) .withMaxRetryWait(20 * 1000) .runInterruptible(new Retryable<AttemptState>() { @Override - public AttemptState call() throws IOException { + public AttemptState call() throws IOException + { try (FSDataInputStream in = path.getFileSystem(config).open(path)) { return AttemptState.readFrom(in, modelManager); } } @Override - public boolean isRetryableException(Exception exception) { - // AttemptState.readFrom throws 2 types of exceptions: - // a) EOFException: race between readFrom and writeTo. See comments on AttemptState.readFrom. - // b) IOException "Cannot obtain block length for LocatedBlock": HDFS-1058. See https://github.com/embulk/embulk-executor-mapreduce/pull/3 - // c) other IOException: FileSystem is not working + public boolean isRetryableException(Exception exception) + { + // AttemptState.readFrom throws 4 types of exceptions: // - // a) and b) are temporary problem which is not critical. c) could be temporary problem and it is critical. - // Here retries regardless of the exception type because we can't distinguish b) from c). + // concurrentWriteIsPossible == true: + // a) EOFException: race between readFrom and writeTo. See comments on AttemptState.readFrom. + // b) EOFException: file exists but its format is invalid because this task is retried and last job/attempt left corrupted files (such as empty, partially written, etc) + // c) IOException "Cannot obtain block length for LocatedBlock": HDFS-1058. See https://github.com/embulk/embulk-executor-mapreduce/pull/3 + // d) IOException: FileSystem is not working + // concurrentWriteIsPossible == false: + // e) EOFException: file exists but its format is invalid because this task is retried and last job/attempt left corrupted files (such as empty, partially written, etc) + // f) IOException: FileSystem is not working + // + if (exception instanceof EOFException && !concurrentWriteIsPossible) { + // e) is not recoverable. + return false; + } return true; } @Override public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) - throws RetryGiveupException { - log.warn("Retrying opening state file " + path.getName() + " error: " + exception); + throws RetryGiveupException + { + log.warn("Retrying opening state file {} ({}/{}) error: {}", + path, retryCount, retryLimit, exception); } @Override public void onGiveup(Exception firstException, Exception lastException) - throws RetryGiveupException { - } + throws RetryGiveupException + { } }); } catch (RetryGiveupException e) { Throwables.propagateIfInstanceOf(e.getCause(), IOException.class); throw Throwables.propagate(e.getCause()); } catch (InterruptedException e) { throw new InterruptedIOException(); } } - public static void writeAttemptStateFile(Configuration config, - Path stateDir, AttemptState state, ModelManager modelManager) throws IOException + private static <T> T hadoopOperationWithRetry(final String message, final Callable<T> callable) throws IOException { - Path path = new Path(stateDir, state.getAttemptId().toString()); - // TODO retry file create and write - try (FSDataOutputStream out = path.getFileSystem(config).create(path, true)) { - state.writeTo(out, modelManager); + final Logger log = Exec.getLogger(EmbulkMapReduce.class); + try { + return retryExecutor() + .withRetryLimit(5) + .withInitialRetryWait(2 * 1000) + .withMaxRetryWait(20 * 1000) + .runInterruptible(new Retryable<T>() { + @Override + public T call() throws Exception + { + return callable.call(); + } + + @Override + public boolean isRetryableException(Exception exception) + { + return true; + } + + @Override + public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) + throws RetryGiveupException + { + log.warn("Retrying {} ({}/{}) error: {}", + message, retryCount, retryLimit, exception); + } + + @Override + public void onGiveup(Exception firstException, Exception lastException) + throws RetryGiveupException + { } + }); + } catch (RetryGiveupException e) { + Throwables.propagateIfInstanceOf(e.getCause(), IOException.class); + throw Throwables.propagate(e.getCause()); + } catch (InterruptedException e) { + throw new InterruptedIOException(); } } public static class SessionRunner {