src/main/java/org/embulk/output/elasticsearch/ElasticsearchOutputPlugin.java in embulk-output-elasticsearch-0.1.8 vs src/main/java/org/embulk/output/elasticsearch/ElasticsearchOutputPlugin.java in embulk-output-elasticsearch-0.2.0
- old
+ new
@@ -1,52 +1,67 @@
package org.embulk.output.elasticsearch;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
+import org.elasticsearch.cluster.metadata.AliasOrIndex;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.embulk.config.TaskReport;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.indices.InvalidAliasNameException;
+
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
+import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
+import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.Column;
+import org.embulk.spi.ColumnVisitor;
import org.embulk.spi.Exec;
import org.embulk.spi.OutputPlugin;
import org.embulk.spi.Page;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
-import org.embulk.spi.ColumnVisitor;
import org.embulk.spi.TransactionalPageOutput;
+import org.embulk.spi.time.Timestamp;
+import org.embulk.spi.type.Types;
import org.slf4j.Logger;
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.inject.Inject;
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
-import static com.google.common.base.Preconditions.checkState;
-
public class ElasticsearchOutputPlugin
implements OutputPlugin
{
public interface NodeAddressTask
extends Task
@@ -60,20 +75,30 @@
}
public interface PluginTask
extends Task
{
+ @Config("mode")
+ @ConfigDefault("\"insert\"")
+ public Mode getMode();
+
@Config("nodes")
public List<NodeAddressTask> getNodes();
@Config("cluster_name")
@ConfigDefault("\"elasticsearch\"")
public String getClusterName();
@Config("index")
public String getIndex();
+ public void setIndex(String indexName);
+ @Config("alias")
+ @ConfigDefault("null")
+ public Optional<String> getAlias();
+ public void setAlias(Optional<String> aliasName);
+
@Config("index_type")
public String getType();
@Config("id")
@ConfigDefault("null")
@@ -106,25 +131,19 @@
{
final PluginTask task = config.loadConfig(PluginTask.class);
// confirm that a client can be initialized
try (Client client = createClient(task)) {
- }
-
- // check that id is included in the schema or not if the id is not null.
- if (task.getId().isPresent()) {
- String id = task.getId().get();
- boolean found = false;
- for (Column column : schema.getColumns()) {
- if (column.equals(id)) {
- found = true;
+ log.info(String.format("Executing plugin with '%s' mode.", task.getMode()));
+ if (task.getMode().equals(Mode.REPLACE)) {
+ task.setAlias(Optional.of(task.getIndex()));
+ task.setIndex(generateNewIndexName(task.getIndex()));
+ if (isExistsIndex(task.getAlias().orNull(), client) && !isAlias(task.getAlias().orNull(), client)) {
+ throw new ConfigException(String.format("Invalid alias name [%s], an index exists with the same name as the alias", task.getAlias().orNull()));
}
}
- checkState(found, "id is not included in column names of the Schema.");
- }
-
- try {
+ log.info(String.format("Inserting data into index[%s]", task.getIndex()));
control.run(task.dump());
} catch (Exception e) {
throw Throwables.propagate(e);
}
@@ -143,23 +162,35 @@
@Override
public void cleanup(TaskSource taskSource,
Schema schema, int processorCount,
List<TaskReport> successTaskReports)
- { }
+ {
+ final PluginTask task = taskSource.loadTask(PluginTask.class);
+ if (task.getMode().equals(Mode.REPLACE)) {
+ try (Client client = createClient(task)) {
+ reAssignAlias(task.getAlias().orNull(), task.getIndex(), client);
+ } catch (IndexNotFoundException | InvalidAliasNameException e) {
+ throw new ConfigException(e);
+ }
+ }
+ }
private Client createClient(final PluginTask task)
{
// @see http://www.elasticsearch.org/guide/en/elasticsearch/client/java-api/current/client.html
- Settings settings = ImmutableSettings.settingsBuilder()
- .classLoader(Settings.class.getClassLoader())
+ Settings settings = Settings.settingsBuilder()
.put("cluster.name", task.getClusterName())
.build();
- TransportClient client = new TransportClient(settings);
+ TransportClient client = TransportClient.builder().settings(settings).build();
List<NodeAddressTask> nodes = task.getNodes();
for (NodeAddressTask node : nodes) {
- client.addTransportAddress(new InetSocketTransportAddress(node.getHost(), node.getPort()));
+ try {
+ client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(node.getHost()), node.getPort()));
+ } catch (UnknownHostException e) {
+ Throwables.propagate(e);
+ }
}
return client;
}
private BulkProcessor newBulkProcessor(final PluginTask task, final Client client)
@@ -206,11 +237,10 @@
@Override
public TransactionalPageOutput open(TaskSource taskSource, Schema schema,
int processorIndex)
{
final PluginTask task = taskSource.loadTask(PluginTask.class);
-
Client client = createClient(task);
BulkProcessor bulkProcessor = newBulkProcessor(task, client);
ElasticsearchPageOutput pageOutput = new ElasticsearchPageOutput(task, client, bulkProcessor);
pageOutput.open(schema);
return pageOutput;
@@ -222,10 +252,11 @@
private Client client;
private BulkProcessor bulkProcessor;
private PageReader pageReader;
+ private Column idColumn;
private final String index;
private final String type;
private final String id;
@@ -242,10 +273,11 @@
}
void open(final Schema schema)
{
pageReader = new PageReader(schema);
+ idColumn = (id == null) ? null : schema.lookupColumn(id);
}
@Override
public void add(Page page)
{
@@ -340,21 +372,45 @@
}
}
});
contextBuilder.endObject();
- bulkProcessor.add(newIndexRequest().source(contextBuilder));
+ bulkProcessor.add(newIndexRequest(getIdValue(idColumn)).source(contextBuilder));
} catch (IOException e) {
Throwables.propagate(e); // TODO error handling
}
}
}
- private IndexRequest newIndexRequest()
+ /**
+ * @param inputColumn
+ * @return
+ */
+ private String getIdValue(Column inputColumn) {
+ if (inputColumn == null) return null;
+ if (pageReader.isNull(inputColumn)) return null;
+ String idValue = null;
+ if (Types.STRING.equals(inputColumn.getType())) {
+ idValue = pageReader.getString(inputColumn);
+ } else if (Types.BOOLEAN.equals(inputColumn.getType())) {
+ idValue = pageReader.getBoolean(inputColumn) + "";
+ } else if (Types.DOUBLE.equals(inputColumn.getType())) {
+ idValue = pageReader.getDouble(inputColumn) + "";
+ } else if (Types.LONG.equals(inputColumn.getType())) {
+ idValue = pageReader.getLong(inputColumn) + "";
+ } else if (Types.TIMESTAMP.equals(inputColumn.getType())) {
+ idValue = pageReader.getTimestamp(inputColumn).toString();
+ } else {
+ idValue = null;
+ }
+ return idValue;
+ }
+
+ private IndexRequest newIndexRequest(String idValue)
{
- return Requests.indexRequest(index).type(type).id(id);
+ return Requests.indexRequest(index).type(type).id(idValue);
}
@Override
public void finish()
{
@@ -397,7 +453,94 @@
TaskReport report = Exec.newTaskReport();
// TODO
return report;
}
+ }
+
+ public enum Mode
+ {
+ INSERT,
+ REPLACE;
+
+ @JsonValue
+ @Override
+ public String toString()
+ {
+ return name().toLowerCase(Locale.ENGLISH);
+ }
+
+ @JsonCreator
+ public static Mode fromString(String value)
+ {
+ switch (value) {
+ case "insert":
+ return INSERT;
+ case "replace":
+ return REPLACE;
+ default:
+ throw new ConfigException(String.format("Unknown mode '%s'. Supported modes are insert, truncate_insert, replace", value));
+ }
+ }
+ }
+
+ private void reAssignAlias(String aliasName, String newIndexName, Client client)
+ throws IndexNotFoundException, InvalidAliasNameException
+ {
+ if (!isExistsAlias(aliasName, client)) {
+ client.admin().indices().prepareAliases()
+ .addAlias(newIndexName, aliasName)
+ .execute().actionGet();
+ log.info(String.format("Assigned alias[%s] to index[%s]", aliasName, newIndexName));
+ } else {
+ List<String> oldIndices = getIndexByAlias(aliasName, client);
+ client.admin().indices().prepareAliases()
+ .removeAlias(oldIndices.toArray(new String[oldIndices.size()]), aliasName)
+ .addAlias(newIndexName, aliasName)
+ .execute().actionGet();
+ log.info(String.format("Reassigned alias[%s] from index%s to index[%s]", aliasName, oldIndices, newIndexName));
+ for (String index : oldIndices) {
+ deleteIndex(index, client);
+ }
+ }
+ }
+
+ private void deleteIndex(String indexName, Client client)
+ {
+ client.admin().indices().delete(new DeleteIndexRequest(indexName)).actionGet();
+ log.info(String.format("Deleted Index [%s]", indexName));
+ }
+
+ private List<String> getIndexByAlias(String aliasName, Client client)
+ {
+ ImmutableOpenMap<String, List<AliasMetaData>> map = client.admin().indices().getAliases(new GetAliasesRequest(aliasName))
+ .actionGet().getAliases();
+ List<String> indices = new ArrayList<>();
+ for (ObjectObjectCursor<String, List<AliasMetaData>> c : map) {
+ indices.add(c.key);
+ }
+
+ return indices;
+ }
+
+ private boolean isExistsAlias(String aliasName, Client client)
+ {
+ return client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().hasAlias(aliasName);
+ }
+
+ private boolean isExistsIndex(String indexName, Client client)
+ {
+ return client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().hasIndex(indexName);
+ }
+
+ private boolean isAlias(String aliasName, Client client)
+ {
+ AliasOrIndex aliasOrIndex = client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().getAliasAndIndexLookup().get(aliasName);
+ return aliasOrIndex != null && aliasOrIndex.isAlias();
+ }
+
+ public String generateNewIndexName(String indexName)
+ {
+ Timestamp time = Exec.getTransactionTime();
+ return indexName + new SimpleDateFormat("_yyyyMMdd-HHmmss").format(time.toEpochMilli());
}
}