package org.embulk.output.send_email; import java.util.*; import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; import org.embulk.config.*; import org.embulk.spi.*; import javax.mail.*; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeBodyPart; import javax.mail.internet.MimeMessage; import javax.mail.internet.MimeMultipart; public class SendEmailOutputPlugin implements OutputPlugin { public interface PluginTask extends Task { @Config("file_type") @ConfigDefault("\"json\"") public String getFileType(); @Config("to") public List getTO(); @Config("cc") public List getCC(); @Config("from") public String getFrom(); @Config("password") @ConfigDefault("\"pass-word\"") public String getPassword(); @Config("port") public String getPort(); @Config("subject") public String getSubject(); @Config("auth") @ConfigDefault("false") public boolean getAuth(); @Config("host") public String getHost(); @Config("protocol") @ConfigDefault("TLSv1.2") public String getProtocol(); @Config("row") @ConfigDefault("-1") public int getRow(); @Config("username") @ConfigDefault("") public String getUserName(); @Config("smtp_enable") @ConfigDefault("true") public String getSmtpEnable(); } @Override public ConfigDiff transaction(ConfigSource config, Schema schema, int taskCount, OutputPlugin.Control control) { PluginTask task = config.loadConfig(PluginTask.class); control.run(task.dump()); return Exec.newConfigDiff(); } @Override public ConfigDiff resume(TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) { throw new UnsupportedOperationException("myp output plugin does not support resuming"); } @Override public void cleanup(TaskSource taskSource, Schema schema, int taskCount, List successTaskReports) { } @Override public TransactionalPageOutput open(TaskSource taskSource, Schema schema, int taskIndex) { PageReader pageReader; PluginTask task = taskSource.loadTask(PluginTask.class); pageReader = new PageReader(schema); return new PageTransactionalOutput(task, pageReader, schema); } public static class PageTransactionalOutput implements TransactionalPageOutput { PluginTask task; PageReader pageReader; Schema schema; public PageTransactionalOutput(PluginTask task, PageReader pageReader, Schema schema) { this.task = task; this.pageReader = pageReader; this.schema = schema; } @Override public void add(Page page) { ArrayList> mapList = new ArrayList<>(); pageReader.setPage(page); int rows = task.getRow(); if (task.getRow() == -1) { rows = Integer.MAX_VALUE; } readingAndSettingDataToMap(pageReader, rows, mapList); pageReader.close(); try { Transport.send(prepareMessage(getSession(task), task.getFrom(), task.getTO(), task.getCC(), mapList, schema, task)); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } private void readingAndSettingDataToMap(PageReader pageReader, int rows, ArrayList> mapList) { while (pageReader.nextRecord() && rows-- > 0) { LinkedHashMap map = new LinkedHashMap<>(); pageReader.getSchema().visitColumns(new ColumnVisitor() { @Override public void booleanColumn(Column column) { if (pageReader.isNull(column)) { map.put(column.getName(), ""); } else { map.put(column.getName(), pageReader.getBoolean(column)); } } @Override public void longColumn(Column column) { if (pageReader.isNull(column)) { map.put(column.getName(), ""); } else { map.put(column.getName(), pageReader.getLong(column)); } } @Override public void doubleColumn(Column column) { if (pageReader.isNull(column)) { map.put(column.getName(), ""); } else { map.put(column.getName(), pageReader.getDouble(column)); } } @Override public void stringColumn(Column column) { if (pageReader.isNull(column)) { map.put(column.getName(), ""); } else { map.put(column.getName(), pageReader.getString(column)); } } @Override public void timestampColumn(Column column) { if (pageReader.isNull(column)) { map.put(column.getName(), ""); } else { map.put(column.getName(), pageReader.getTimestamp(column)); } } @Override public void jsonColumn(Column column) { if (pageReader.isNull(column)) { map.put(column.getName(), ""); } else { map.put(column.getName(), pageReader.getJson(column)); } } }); mapList.add(map); } } private Session getSession(PluginTask task) { try { Properties properties = new Properties(); properties.put("mail.smtp.auth", task.getAuth()); properties.put("mail.smtp.starttls.enable", task.getSmtpEnable()); properties.put("mail.smtp.ssl.protocols", task.getProtocol()); properties.put("mail.smtp.host", task.getHost()); properties.put("mail.smtp.port", task.getPort()); if(task.getAuth()) { return Session.getInstance(properties, new javax.mail.Authenticator() { protected PasswordAuthentication getPasswordAuthentication() { return new PasswordAuthentication(task.getFrom(), task.getPassword()); } }); }else{ properties.put("mail.smtp.user", task.getUserName()); return Session.getDefaultInstance(properties); } } catch (Exception e) { e.printStackTrace(); throw e; } } private static Message prepareMessage(Session session, String from, List to, List cc, ArrayList> mapList, Schema schema, PluginTask task) { try { String listStringTo = to.stream().map(Object::toString) .collect(Collectors.joining(",")); String listStringCC = cc.stream().map(Object::toString) .collect(Collectors.joining(",")); Message message = new MimeMessage(session); message.setFrom(new InternetAddress(from)); message.addRecipients(Message.RecipientType.CC, InternetAddress.parse(listStringCC)); message.addRecipients(Message.RecipientType.TO, InternetAddress.parse(listStringTo)); message.setSubject(task.getSubject()); if (task.getFileType().equalsIgnoreCase("html")) { setHtmlContent(message, mapList, schema); } else if (task.getFileType().equalsIgnoreCase("json")) { setJsonContent(message, mapList, schema); } return message; } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } private static void setHtmlContent(Message message, ArrayList> mapList, Schema schema) { try { MimeMultipart multipart = new MimeMultipart(); BodyPart messageBodyPart = new MimeBodyPart(); StringBuilder email = new StringBuilder(); email.append("" + ""); email.append(""); for (int i = 0; i < schema.size(); i++) { email.append(""); } email.append(""); for (int i = 0; i < mapList.size(); i++) { Map map = mapList.get(i); email.append(""); for (Map.Entry entry : map.entrySet()) { email.append(""); } email.append(""); } email.append("
"); email.append(schema.getColumn(i).getName().split("/")[0]); email.append("
"); email.append(entry.getValue()); email.append("
"); messageBodyPart.setContent(email.toString(), "text/html"); multipart.addBodyPart(messageBodyPart); message.setContent(multipart); } catch (Exception e) { System.out.println("Not able to send data"); throw new RuntimeException(e); } } private static void setJsonContent(Message message, ArrayList> mapList, Schema schema) { try { ObjectMapper objectMapper = new ObjectMapper(); String value = null; value = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(mapList); message.setText(value); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } @Override public void finish() { } @Override public void close() { } @Override public void abort() { } @Override public TaskReport commit() { return null; } } }