package org.embulk.input.s3; import com.amazonaws.AmazonServiceException; import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.retry.PredefinedRetryPolicies; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.google.common.annotations.VisibleForTesting; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.ConfigInject; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.input.s3.explorer.S3NameOrderPrefixFileExplorer; import org.embulk.input.s3.explorer.S3SingleFileExplorer; import org.embulk.input.s3.explorer.S3TimeOrderPrefixFileExplorer; import org.embulk.input.s3.utils.DateUtils; import org.embulk.spi.BufferAllocator; import org.embulk.spi.Exec; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.TransactionalFileInput; import org.embulk.spi.util.InputStreamFileInput; import org.embulk.spi.util.InputStreamFileInput.InputStreamWithHints; import org.embulk.spi.util.ResumableInputStream; import org.embulk.spi.util.RetryExecutor; import org.embulk.util.aws.credentials.AwsCredentials; import org.embulk.util.aws.credentials.AwsCredentialsTask; import org.slf4j.Logger; import java.io.IOException; import java.io.InputStream; import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Optional; import static java.lang.String.format; import static org.embulk.spi.util.RetryExecutor.retryExecutor; public abstract class AbstractS3FileInputPlugin implements FileInputPlugin { private static final Logger LOGGER = Exec.getLogger(S3FileInputPlugin.class); private static final String FULL_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; public interface PluginTask extends AwsCredentialsTask, FileList.Task, RetrySupportPluginTask, Task { @Config("bucket") String getBucket(); @Config("path_prefix") @ConfigDefault("null") Optional getPathPrefix(); @Config("path") @ConfigDefault("null") Optional getPath(); @Config("last_path") @ConfigDefault("null") Optional getLastPath(); @Config("access_key_id") @ConfigDefault("null") Optional getAccessKeyId(); @Config("http_proxy") @ConfigDefault("null") Optional getHttpProxy(); void setHttpProxy(Optional httpProxy); @Config("incremental") @ConfigDefault("true") boolean getIncremental(); @Config("skip_glacier_objects") @ConfigDefault("false") boolean getSkipGlacierObjects(); @Config("use_modified_time") @ConfigDefault("false") boolean getUseModifiedTime(); @Config("last_modified_time") @ConfigDefault("null") Optional getLastModifiedTime(); // TODO timeout, ssl, etc //////////////////////////////////////// // Internal configurations //////////////////////////////////////// FileList getFiles(); void setFiles(FileList files); /** * end_modified_time is conditionally set if modified_time mode is enabled. * * It is internal state and must not be set in config.yml */ @Config("__end_modified_time") @ConfigDefault("null") Optional getEndModifiedTime(); void setEndModifiedTime(Optional endModifiedTime); @ConfigInject BufferAllocator getBufferAllocator(); } protected abstract Class getTaskClass(); @Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { PluginTask task = config.loadConfig(getTaskClass()); errorIfInternalParamsAreSet(task); validateInputTask(task); // list files recursively task.setFiles(listFiles(task)); // number of processors is same with number of files return resume(task.dump(), task.getFiles().getTaskCount(), control); } @Override public ConfigDiff resume(TaskSource taskSource, int taskCount, FileInputPlugin.Control control) { PluginTask task = taskSource.loadTask(getTaskClass()); // validate task newS3Client(task); control.run(taskSource, taskCount); // build next config ConfigDiff configDiff = Exec.newConfigDiff(); // last_path if (task.getIncremental()) { if (task.getUseModifiedTime()) { Date endModifiedTime = task.getEndModifiedTime().orElse(new Date()); configDiff.set("last_modified_time", new SimpleDateFormat(FULL_DATE_FORMAT).format(endModifiedTime)); } else { Optional lastPath = task.getFiles().getLastPath(task.getLastPath()); LOGGER.info("Incremental job, setting last_path to [{}]", lastPath.orElse("")); configDiff.set("last_path", lastPath); } } return configDiff; } @Override public void cleanup(TaskSource taskSource, int taskCount, List successTaskReports) { // do nothing } /** * Provide an overridable default client. * Since this returns an immutable object, it is not for any further customizations by mutating, * e.g., {@link AmazonS3#setEndpoint} will throw a runtime {@link UnsupportedOperationException} * Subclass's customization should be done through {@link AbstractS3FileInputPlugin#defaultS3ClientBuilder}. * @param task Embulk plugin task * @return AmazonS3 */ protected AmazonS3 newS3Client(PluginTask task) { return defaultS3ClientBuilder(task).build(); } /** * A base builder for the subclasses to then customize.builder * @param task Embulk plugin * @return AmazonS3 client b **/ protected AmazonS3ClientBuilder defaultS3ClientBuilder(PluginTask task) { return AmazonS3ClientBuilder .standard() .withCredentials(getCredentialsProvider(task)) .withClientConfiguration(getClientConfiguration(task)); } protected AWSCredentialsProvider getCredentialsProvider(PluginTask task) { return AwsCredentials.getAWSCredentialsProvider(task); } protected ClientConfiguration getClientConfiguration(PluginTask task) { ClientConfiguration clientConfig = new ClientConfiguration(); //clientConfig.setProtocol(Protocol.HTTP); clientConfig.setMaxConnections(50); // SDK default: 50 // clientConfig.setMaxErrorRetry(3); // SDK default: 3 clientConfig.setSocketTimeout(8 * 60 * 1000); // SDK default: 50*1000 clientConfig.setRetryPolicy(PredefinedRetryPolicies.NO_RETRY_POLICY); // set http proxy if (task.getHttpProxy().isPresent()) { setHttpProxyInAwsClient(clientConfig, task.getHttpProxy().get()); } return clientConfig; } private void setHttpProxyInAwsClient(ClientConfiguration clientConfig, HttpProxy httpProxy) { // host clientConfig.setProxyHost(httpProxy.getHost()); // port if (httpProxy.getPort().isPresent()) { clientConfig.setProxyPort(httpProxy.getPort().get()); } // https clientConfig.setProtocol(httpProxy.getHttps() ? Protocol.HTTPS : Protocol.HTTP); // user if (httpProxy.getUser().isPresent()) { clientConfig.setProxyUsername(httpProxy.getUser().get()); } // password if (httpProxy.getPassword().isPresent()) { clientConfig.setProxyPassword(httpProxy.getPassword().get()); } } /** * Build the common retry executor from some configuration params of plugin task. * @param task Plugin task. * @return RetryExecutor object */ private static RetryExecutor retryExecutorFrom(RetrySupportPluginTask task) { return retryExecutor() .withRetryLimit(task.getMaximumRetries()) .withInitialRetryWait(task.getInitialRetryIntervalMillis()) .withMaxRetryWait(task.getMaximumRetryIntervalMillis()); } private FileList listFiles(final PluginTask task) { try { AmazonS3 client = newS3Client(task); String bucketName = task.getBucket(); FileList.Builder builder = new FileList.Builder(task); RetryExecutor retryExec = retryExecutorFrom(task); if (task.getPath().isPresent()) { LOGGER.info("Start getting object with path: [{}]", task.getPath().get()); new S3SingleFileExplorer(bucketName, client, retryExec, task.getPath().get()).addToBuilder(builder); return builder.build(); } // does not need to verify existent path prefix here since there is the validation requires either path or path_prefix LOGGER.info("Start listing file with prefix [{}]", task.getPathPrefix().get()); if (task.getPathPrefix().get().equals("/")) { LOGGER.info("Listing files with prefix \"/\". This doesn't mean all files in a bucket. If you intend to read all files, use \"path_prefix: ''\" (empty string) instead."); } if (task.getUseModifiedTime()) { Date now = new Date(); Optional from = task.getLastModifiedTime().isPresent() ? Optional.of(DateUtils.parse(task.getLastModifiedTime().get(), Collections.singletonList(FULL_DATE_FORMAT))) : Optional.empty(); task.setEndModifiedTime(Optional.of(now)); new S3TimeOrderPrefixFileExplorer(bucketName, client, retryExec, task.getPathPrefix().get(), task.getSkipGlacierObjects(), from, now).addToBuilder(builder); } else { new S3NameOrderPrefixFileExplorer(bucketName, client, retryExec, task.getPathPrefix().get(), task.getSkipGlacierObjects(), task.getLastPath().orElse(null)).addToBuilder(builder); } LOGGER.info("Found total [{}] files", builder.size()); return builder.build(); } catch (AmazonServiceException ex) { if (ex.getErrorType().equals(AmazonServiceException.ErrorType.Client)) { // HTTP 40x errors. auth error, bucket doesn't exist, etc. See AWS document for the full list: // http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html if (ex.getStatusCode() != 400 // 404 Bad Request is unexpected error || "ExpiredToken".equalsIgnoreCase(ex.getErrorCode())) { // if statusCode == 400 && errorCode == ExpiredToken => throws ConfigException throw new ConfigException(ex); } } throw ex; } } private void validateInputTask(final PluginTask task) { if (!task.getPathPrefix().isPresent() && !task.getPath().isPresent()) { throw new ConfigException("Either path or path_prefix is required"); } } @Override public TransactionalFileInput open(TaskSource taskSource, int taskIndex) { PluginTask task = taskSource.loadTask(getTaskClass()); return new S3FileInput(task, taskIndex); } @VisibleForTesting static class S3InputStreamReopener implements ResumableInputStream.Reopener { private final Logger log = Exec.getLogger(S3InputStreamReopener.class); private final AmazonS3 client; private final GetObjectRequest request; private final long contentLength; private final RetryExecutor retryExec; public S3InputStreamReopener(AmazonS3 client, GetObjectRequest request, long contentLength) { this(client, request, contentLength, null); } public S3InputStreamReopener(AmazonS3 client, GetObjectRequest request, long contentLength, RetryExecutor retryExec) { this.client = client; this.request = request; this.contentLength = contentLength; this.retryExec = retryExec; } @Override public InputStream reopen(final long offset, final Exception closedCause) throws IOException { log.warn(format("S3 read failed. Retrying GET request with %,d bytes offset", offset), closedCause); request.setRange(offset, contentLength - 1); // [first, last] return new DefaultRetryable(format("Getting object '%s'", request.getKey())) { @Override public S3ObjectInputStream call() { return client.getObject(request).getObjectContent(); } }.executeWithCheckedException(retryExec, IOException.class); } } public class S3FileInput extends InputStreamFileInput implements TransactionalFileInput { public S3FileInput(PluginTask task, int taskIndex) { super(task.getBufferAllocator(), new SingleFileProvider(task, taskIndex)); } public void abort() { } public TaskReport commit() { return Exec.newTaskReport(); } @Override public void close() { } } @VisibleForTesting static void errorIfInternalParamsAreSet(PluginTask task) { if (task.getEndModifiedTime().isPresent()) { throw new ConfigException("'__end_modified_time' must not be set."); } } // TODO create single-file InputStreamFileInput utility private class SingleFileProvider implements InputStreamFileInput.Provider { private AmazonS3 client; private final String bucket; private final Iterator iterator; private final RetryExecutor retryExec; public SingleFileProvider(PluginTask task, int taskIndex) { this.client = newS3Client(task); this.bucket = task.getBucket(); this.iterator = task.getFiles().get(taskIndex).iterator(); this.retryExec = retryExecutorFrom(task); } @Override public InputStreamWithHints openNextWithHints() throws IOException { if (!iterator.hasNext()) { return null; } final String key = iterator.next(); final GetObjectRequest request = new GetObjectRequest(bucket, key); S3Object object = new DefaultRetryable(format("Getting object '%s'", request.getKey())) { @Override public S3Object call() { return client.getObject(request); } }.executeWithCheckedException(retryExec, IOException.class); long objectSize = object.getObjectMetadata().getContentLength(); // Some plugin users are parsing this output to get file list. // Keep it for now but might be removed in the future. LOGGER.info("Open S3Object with bucket [{}], key [{}], with size [{}]", bucket, key, objectSize); InputStream inputStream = new ResumableInputStream(object.getObjectContent(), new S3InputStreamReopener(client, request, objectSize, retryExec)); return new InputStreamWithHints(inputStream, String.format("s3://%s/%s", bucket, key)); } @Override public void close() { } } }