package org.embulk.input.azure_blob_storage; import java.util.List; import java.util.ArrayList; import java.util.Collections; import java.io.InputStream; import java.io.IOException; import java.security.InvalidKeyException; import java.net.URISyntaxException; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.base.Throwables; import org.embulk.config.Config; import org.embulk.config.ConfigException; import org.embulk.config.ConfigInject; import org.embulk.config.ConfigDefault; import org.embulk.config.Task; import org.embulk.config.TaskSource; import org.embulk.config.ConfigSource; import org.embulk.config.ConfigDiff; import org.embulk.config.TaskReport; import org.embulk.spi.Exec; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.BufferAllocator; import org.embulk.spi.TransactionalFileInput; import org.embulk.spi.util.InputStreamFileInput; import com.microsoft.azure.storage.*; import com.microsoft.azure.storage.blob.*; import org.slf4j.Logger; public class AzureBlobStorageFileInputPlugin implements FileInputPlugin { public interface PluginTask extends Task { @Config("account_name") String getAccountName(); @Config("account_key") String getAccountKey(); @Config("container") String getContainer(); @Config("path_prefix") String getPathPrefix(); @Config("last_path") @ConfigDefault("null") Optional getLastPath(); @Config("max_results") @ConfigDefault("5000") int getMaxResults(); List getFiles(); void setFiles(List files); @ConfigInject BufferAllocator getBufferAllocator(); } private static final Logger log = Exec.getLogger(AzureBlobStorageFileInputPlugin.class); @Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { final PluginTask task = config.loadConfig(PluginTask.class); CloudBlobClient blobClient = newAzureClient(task.getAccountName(), task.getAccountKey()); task.setFiles(listFiles(blobClient, task)); return resume(task.dump(), task.getFiles().size(), control); } @Override public ConfigDiff resume(TaskSource taskSource, int taskCount, FileInputPlugin.Control control) { PluginTask task = taskSource.loadTask(PluginTask.class); control.run(taskSource, taskCount); ConfigDiff configDiff = Exec.newConfigDiff(); List files = new ArrayList<> (task.getFiles()); if (files.isEmpty()) { if (task.getLastPath().isPresent()) { configDiff.set("last_path", task.getLastPath().get()); } } else { Collections.sort(files); configDiff.set("last_path", files.get(files.size() - 1)); } return configDiff; } @Override public void cleanup(TaskSource taskSource, int taskCount, List successTaskReports) { } private static CloudBlobClient newAzureClient(String accountName, String accountKey) { String connectionString = "DefaultEndpointsProtocol=http;" + "AccountName=" + accountName + ";" + "AccountKey=" + accountKey; CloudStorageAccount account; try { account = CloudStorageAccount.parse(connectionString); } catch (InvalidKeyException | URISyntaxException ex) { throw new ConfigException(ex); } return account.createCloudBlobClient(); } private List listFiles(CloudBlobClient client, PluginTask task) { if (task.getPathPrefix().equals("/")) { log.info("Listing files with prefix \"/\". This doesn't mean all files in a bucket. If you intend to read all files, use \"path_prefix: ''\" (empty string) instead."); } return listFilesWithPrefix(client, task.getContainer(), task.getPathPrefix(), task.getLastPath(), task.getMaxResults()); } private static List listFilesWithPrefix(CloudBlobClient client, String containerName, String prefix, Optional lastPath, int maxResults) { ImmutableList.Builder builder = ImmutableList.builder(); // It seems I can't cast lastKey to token by Azure SDK for Java String lastKey = lastPath.orNull(); ResultContinuation token = null; try { CloudBlobContainer container = client.getContainerReference(containerName); ResultSegment blobs; do { blobs = container.listBlobsSegmented(prefix, true, null, maxResults, token, null, null); log.debug(String.format("result count(include directory):%s continuationToken:%s", blobs.getLength() ,blobs.getContinuationToken())); for (ListBlobItem blobItem : blobs.getResults()) { if (blobItem instanceof CloudBlob) { CloudBlob blob = (CloudBlob) blobItem; if (blob.exists() && !blob.getUri().toString().endsWith("/")) { builder.add(blob.getName()); log.debug(String.format("name:%s, class:%s, uri:%s", blob.getName(), blob.getClass(), blob.getUri())); } } } token = blobs.getContinuationToken(); } while (blobs.getContinuationToken() != null); } catch (URISyntaxException | StorageException ex) { throw Throwables.propagate(ex); } return builder.build(); } @Override public TransactionalFileInput open(TaskSource taskSource, int taskIndex) { PluginTask task = taskSource.loadTask(PluginTask.class); return new AzureFileInput(task, taskIndex); } class AzureFileInput extends InputStreamFileInput implements TransactionalFileInput { public AzureFileInput (PluginTask task, int taskIndex) { super(task.getBufferAllocator(), new SingleFileProvider(task, taskIndex)); } public void abort() {} public TaskReport commit() { return Exec.newTaskReport(); } @Override public void close() {} } class SingleFileProvider implements InputStreamFileInput.Provider { private CloudBlobClient client; private final String containerName; private final String key; private boolean opened = false; public SingleFileProvider(PluginTask task, int taskIndex) { this.client = newAzureClient(task.getAccountName(), task.getAccountKey()); this.containerName = task.getContainer(); this.key = task.getFiles().get(taskIndex); } @Override public InputStream openNext() throws IOException { if (opened) { return null; } opened = true; InputStream inputStream = null; try { CloudBlobContainer container = client.getContainerReference(containerName); CloudBlob blob = container.getBlockBlobReference(key); inputStream = blob.openInputStream(); } catch (StorageException | URISyntaxException ex) { Throwables.propagate(ex); } return inputStream; } @Override public void close() {} } }