package org.embulk.output.azure_blob_storage; import com.google.common.base.Throwables; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; import com.microsoft.azure.storage.blob.CloudBlockBlob; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; import org.embulk.spi.Buffer; import org.embulk.spi.Exec; import org.embulk.spi.FileOutputPlugin; import org.embulk.spi.TransactionalFileOutput; import org.slf4j.Logger; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.URISyntaxException; import java.security.InvalidKeyException; import java.util.List; public class AzureBlobStorageFileOutputPlugin implements FileOutputPlugin { public interface PluginTask extends Task { @Config("account_name") String getAccountName(); @Config("account_key") String getAccountKey(); @Config("container") String getContainer(); @Config("path_prefix") String getPathPrefix(); @Config("file_ext") String getFileNameExtension(); @Config("sequence_format") @ConfigDefault("\"%03d.%02d\"") String getSequenceFormat(); @Config("max_connection_retry") @ConfigDefault("5") // 5 times retry to connect sftp server if failed. int getMaxConnectionRetry(); } private static final Logger log = Exec.getLogger(AzureBlobStorageFileOutputPlugin.class); @Override public ConfigDiff transaction(ConfigSource config, int taskCount, FileOutputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); try { CloudBlobClient blobClient = newAzureClient(task.getAccountName(), task.getAccountKey()); String containerName = task.getContainer(); CloudBlobContainer container = blobClient.getContainerReference(containerName); if (!container.exists()) { log.info("container {} doesn't exists and created.", containerName); container.createIfNotExists(); } } catch (StorageException | URISyntaxException | ConfigException ex) { Throwables.propagate(ex); } return resume(task.dump(), taskCount, control); } @Override public ConfigDiff resume(TaskSource taskSource, int taskCount, FileOutputPlugin.Control control) { control.run(taskSource); return Exec.newConfigDiff(); } @Override public void cleanup(TaskSource taskSource, int taskCount, List successTaskReports) { } private static CloudBlobClient newAzureClient(String accountName, String accountKey) { String connectionString = "DefaultEndpointsProtocol=https;" + "AccountName=" + accountName + ";" + "AccountKey=" + accountKey; CloudStorageAccount account; try { account = CloudStorageAccount.parse(connectionString); } catch (InvalidKeyException | URISyntaxException ex) { throw new ConfigException(ex.getMessage()); } return account.createCloudBlobClient(); } @Override public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex) { final PluginTask task = taskSource.loadTask(PluginTask.class); return new AzureFileOutput(task, taskIndex); } public static class AzureFileOutput implements TransactionalFileOutput { private final String pathPrefix; private final String sequenceFormat; private final String pathSuffix; private final CloudBlobClient client; private final int maxConnectionRetry; private CloudBlobContainer container = null; private BufferedOutputStream output = null; private int fileIndex; private File file; private String filePath; private int taskIndex; public AzureFileOutput(PluginTask task, int taskIndex) { this.taskIndex = taskIndex; this.pathPrefix = task.getPathPrefix(); this.sequenceFormat = task.getSequenceFormat(); this.pathSuffix = task.getFileNameExtension(); this.client = newAzureClient(task.getAccountName(), task.getAccountKey()); this.maxConnectionRetry = task.getMaxConnectionRetry(); try { this.container = client.getContainerReference(task.getContainer()); } catch (URISyntaxException | StorageException ex) { Throwables.propagate(ex); } } @Override public void nextFile() { closeFile(); try { String suffix = pathSuffix; if (!suffix.startsWith(".")) { suffix = "." + suffix; } filePath = pathPrefix + String.format(sequenceFormat, taskIndex, fileIndex) + suffix; file = File.createTempFile(filePath, ".tmp"); log.info("Writing local file {}", file.getAbsolutePath()); output = new BufferedOutputStream(new FileOutputStream(file)); } catch (IOException ex) { throw Throwables.propagate(ex); } } private void closeFile() { if (output != null) { try { output.close(); fileIndex++; } catch (IOException ex) { throw Throwables.propagate(ex); } } } @Override public void add(Buffer buffer) { try { output.write(buffer.array(), buffer.offset(), buffer.limit()); } catch (IOException ex) { throw Throwables.propagate(ex); } finally { buffer.release(); } } @Override public void finish() { close(); try { Thread.sleep(1000 * 10); } catch (Exception ex) { // null; } if (filePath != null) { int count = 0; while (true) { try { CloudBlockBlob blob = container.getBlockBlobReference(filePath); log.info("Upload start {} to {}", file.getAbsolutePath(), filePath); blob.upload(new FileInputStream(file), file.length()); log.info("Upload completed {} to {}", file.getAbsolutePath(), filePath); log.info("Delete completed local file {}", file.getAbsolutePath()); if (!file.delete()) { throw new ConfigException("Couldn't delete local file " + file.getAbsolutePath()); } break; } catch (FileNotFoundException | URISyntaxException ex) { throw new ConfigException(ex); } catch (StorageException | IOException ex) { if (++count == maxConnectionRetry) { Throwables.propagate(ex); } log.warn("failed to connect SFTP server: " + ex.getMessage(), ex); try { long sleepTime = ((long) Math.pow(2, count) * 1000); log.warn("sleep in next connection retry: {} milliseconds", sleepTime); Thread.sleep(sleepTime); // milliseconds } catch (InterruptedException ex2) { // Ignore this exception because this exception is just about `sleep`. log.warn(ex2.getMessage(), ex2); } log.warn("retrying to connect SFTP server: " + count + " times"); } } } } @Override public void close() { closeFile(); } @Override public void abort() {} @Override public TaskReport commit() { return Exec.newTaskReport(); } } }