src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java in embulk-output-elasticsearch-0.3.1 vs src/test/java/org/embulk/output/elasticsearch/TestElasticsearchOutputPlugin.java in embulk-output-elasticsearch-0.4.0
- old
+ new
@@ -1,150 +1,86 @@
package org.embulk.output.elasticsearch;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.UnknownHostException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.security.GeneralSecurityException;
-
+import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.client.Client;
+import org.eclipse.jetty.http.HttpMethod;
import org.embulk.EmbulkTestRuntime;
import org.embulk.config.ConfigException;
+import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
-import org.embulk.config.ConfigSource;
+import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.AuthMethod;
+import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.Mode;
+import org.embulk.output.elasticsearch.ElasticsearchOutputPluginDelegate.PluginTask;
import org.embulk.spi.Exec;
import org.embulk.spi.OutputPlugin;
import org.embulk.spi.Page;
import org.embulk.spi.PageTestUtils;
import org.embulk.spi.Schema;
-import org.embulk.spi.time.Timestamp;
import org.embulk.spi.TransactionalPageOutput;
-import org.embulk.spi.TestPageBuilderReader.MockPageOutput;
+import org.embulk.spi.time.Timestamp;
import org.embulk.standards.CsvParserPlugin;
-import org.embulk.output.elasticsearch.ElasticsearchOutputPlugin.PluginTask;
+import org.embulk.util.retryhelper.jetty92.Jetty92RetryHelper;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.embulk.output.elasticsearch.ElasticsearchTestUtils.ES_BULK_ACTIONS;
+import static org.embulk.output.elasticsearch.ElasticsearchTestUtils.ES_BULK_SIZE;
+import static org.embulk.output.elasticsearch.ElasticsearchTestUtils.ES_CONCURRENT_REQUESTS;
+import static org.embulk.output.elasticsearch.ElasticsearchTestUtils.ES_ID;
+import static org.embulk.output.elasticsearch.ElasticsearchTestUtils.ES_INDEX;
+import static org.embulk.output.elasticsearch.ElasticsearchTestUtils.ES_INDEX_TYPE;
+import static org.embulk.output.elasticsearch.ElasticsearchTestUtils.ES_NODES;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeNotNull;
public class TestElasticsearchOutputPlugin
{
- private static String ES_HOST;
- private static int ES_PORT;
- private static List ES_NODES;
- private static String ES_CLUSTER_NAME;
- private static String ES_INDEX;
- private static String ES_INDEX_TYPE;
- private static String ES_ID;
- private static int ES_BULK_ACTIONS;
- private static int ES_BULK_SIZE;
- private static int ES_CONCURRENT_REQUESTS;
- private static String PATH_PREFIX;
-
- private MockPageOutput pageOutput;
-
- final String ES_TEST_INDEX = "index_for_unittest";
- final String ES_TEST_INDEX2 = "index_for_unittest2";
- final String ES_TEST_ALIAS = "alias_for_unittest";
-
- /*
- * This test case requires environment variables
- * ES_HOST
- * ES_INDEX
- * ES_INDEX_TYPE
- */
@BeforeClass
public static void initializeConstant()
{
- ES_HOST = System.getenv("ES_HOST") != null ? System.getenv("ES_HOST") : "";
- ES_PORT = System.getenv("ES_PORT") != null ? Integer.valueOf(System.getenv("ES_PORT")) : 9300;
-
- ES_CLUSTER_NAME = System.getenv("ES_CLUSTER_NAME") != null ? System.getenv("ES_CLUSTER_NAME") : "";
- ES_INDEX = System.getenv("ES_INDEX");
- ES_INDEX_TYPE = System.getenv("ES_INDEX_TYPE");
- ES_ID = "id";
- ES_BULK_ACTIONS = System.getenv("ES_BULK_ACTIONS") != null ? Integer.valueOf(System.getenv("ES_BULK_ACTIONS")) : 1000;
- ES_BULK_SIZE = System.getenv("ES_BULK_SIZE") != null ? Integer.valueOf(System.getenv("ES_BULK_SIZE")) : 5242880;
- ES_CONCURRENT_REQUESTS = System.getenv("ES_CONCURRENT_REQUESTS") != null ? Integer.valueOf(System.getenv("ES_CONCURRENT_REQUESTS")) : 5;
-
- assumeNotNull(ES_HOST, ES_INDEX, ES_INDEX_TYPE);
-
- ES_NODES = Arrays.asList(ImmutableMap.of("host", ES_HOST, "port", ES_PORT));
-
- PATH_PREFIX = ElasticsearchOutputPlugin.class.getClassLoader().getResource("sample_01.csv").getPath();
}
-
@Rule
public EmbulkTestRuntime runtime = new EmbulkTestRuntime();
private ElasticsearchOutputPlugin plugin;
+ private ElasticsearchTestUtils utils;
@Before
- public void createResources()
- throws GeneralSecurityException, NoSuchMethodException,
- IllegalAccessException, InvocationTargetException
+ public void createResources() throws Exception
{
- ConfigSource config = config();
- plugin = new ElasticsearchOutputPlugin();
- PluginTask task = config.loadConfig(PluginTask.class);
- pageOutput = new MockPageOutput();
+ utils = new ElasticsearchTestUtils();
+ utils.initializeConstant();
+ PluginTask task = utils.config().loadConfig(PluginTask.class);
+ utils.prepareBeforeTest(task);
- Method createClient = ElasticsearchOutputPlugin.class.getDeclaredMethod("createClient", PluginTask.class);
- createClient.setAccessible(true);
- try (Client client = (Client) createClient.invoke(plugin, task)) {
- // Delete alias
- if (client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().hasAlias(ES_TEST_ALIAS)) {
- client.admin().indices().delete(new DeleteIndexRequest(ES_TEST_ALIAS)).actionGet();
- }
-
- // Delete index
- if (client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().hasIndex(ES_TEST_INDEX)) {
- client.admin().indices().delete(new DeleteIndexRequest(ES_TEST_INDEX)).actionGet();
- }
-
- if (client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().hasIndex(ES_TEST_INDEX2)) {
- client.admin().indices().delete(new DeleteIndexRequest(ES_TEST_INDEX2)).actionGet();
- }
- }
+ plugin = new ElasticsearchOutputPlugin();
}
@Test
public void testDefaultValues()
{
- ConfigSource config = config();
- ElasticsearchOutputPlugin.PluginTask task = config.loadConfig(PluginTask.class);
- assertEquals(ES_INDEX, task.getIndex());
+ PluginTask task = utils.config().loadConfig(PluginTask.class);
+ assertThat(task.getIndex(), is(ES_INDEX));
}
@Test
public void testDefaultValuesNull()
{
ConfigSource config = Exec.newConfigSource()
- .set("in", inputConfig())
- .set("parser", parserConfig(schemaConfig()))
+ .set("in", utils.inputConfig())
+ .set("parser", utils.parserConfig(utils.schemaConfig()))
.set("type", "elasticsearch")
.set("mode", "") // NULL
.set("nodes", ES_NODES)
- .set("cluster_name", ES_CLUSTER_NAME)
.set("index", ES_INDEX)
.set("index_type", ES_INDEX_TYPE)
.set("id", ES_ID)
.set("bulk_actions", ES_BULK_ACTIONS)
.set("bulk_size", ES_BULK_SIZE)
@@ -158,21 +94,22 @@
public List<TaskReport> run(TaskSource taskSource)
{
return Lists.newArrayList(Exec.newTaskReport());
}
});
- } catch (Throwable t) {
+ }
+ catch (Throwable t) {
if (t instanceof RuntimeException) {
assertTrue(t.getCause().getCause() instanceof ConfigException);
}
}
}
@Test
public void testTransaction()
{
- ConfigSource config = config();
+ ConfigSource config = utils.config();
Schema schema = config.getNested("parser").loadConfig(CsvParserPlugin.PluginTask.class).getSchemaConfig().toSchema();
plugin.transaction(config, schema, 0, new OutputPlugin.Control()
{
@Override
public List<TaskReport> run(TaskSource taskSource)
@@ -184,11 +121,11 @@
}
@Test
public void testResume()
{
- ConfigSource config = config();
+ ConfigSource config = utils.config();
Schema schema = config.getNested("parser").loadConfig(CsvParserPlugin.PluginTask.class).getSchemaConfig().toSchema();
PluginTask task = config.loadConfig(PluginTask.class);
plugin.resume(task.dump(), schema, 0, new OutputPlugin.Control()
{
@Override
@@ -200,282 +137,89 @@
}
@Test
public void testCleanup()
{
- ConfigSource config = config();
+ ConfigSource config = utils.config();
Schema schema = config.getNested("parser").loadConfig(CsvParserPlugin.PluginTask.class).getSchemaConfig().toSchema();
PluginTask task = config.loadConfig(PluginTask.class);
plugin.cleanup(task.dump(), schema, 0, Arrays.asList(Exec.newTaskReport()));
// no error happens
}
@Test
- public void testOutputByOpen()
- throws GeneralSecurityException, IOException, NoSuchMethodException,
- IllegalAccessException, InvocationTargetException, ParseException
+ public void testOutputByOpen() throws Exception
{
- ConfigSource config = config();
+ ConfigSource config = utils.config();
Schema schema = config.getNested("parser").loadConfig(CsvParserPlugin.PluginTask.class).getSchemaConfig().toSchema();
PluginTask task = config.loadConfig(PluginTask.class);
plugin.transaction(config, schema, 0, new OutputPlugin.Control() {
@Override
- public List<TaskReport> run(TaskSource taskSource) {
+ public List<TaskReport> run(TaskSource taskSource)
+ {
return Lists.newArrayList(Exec.newTaskReport());
}
});
TransactionalPageOutput output = plugin.open(task.dump(), schema, 0);
List<Page> pages = PageTestUtils.buildPage(runtime.getBufferAllocator(), schema, 1L, 32864L, Timestamp.ofEpochSecond(1422386629), Timestamp.ofEpochSecond(1422316800), true, 123.45, "embulk");
- assertEquals(1, pages.size());
+ assertThat(pages.size(), is(1));
for (Page page : pages) {
output.add(page);
}
output.finish();
output.commit();
+ Thread.sleep(1500); // Need to wait until index done
- Method createClient = ElasticsearchOutputPlugin.class.getDeclaredMethod("createClient", PluginTask.class);
- createClient.setAccessible(true);
- try (Client client = (Client) createClient.invoke(plugin, task)) {
- GetResponse response = client.prepareGet(ES_INDEX, ES_INDEX_TYPE, "1").execute().actionGet();
- assertTrue(response.isExists());
- if (response.isExists()) {
- Map<String, Object> map = response.getSourceAsMap();
- assertEquals(1, map.get("id"));
- assertEquals(32864, map.get("account"));
- assertEquals("2015-01-27T19:23:49.000Z", map.get("time"));
- assertEquals("2015-01-27T00:00:00.000Z", map.get("purchase"));
- assertEquals(true, map.get("flg"));
- assertEquals(123.45, map.get("score"));
- assertEquals("embulk", map.get("comment"));
+ try (Jetty92RetryHelper retryHelper = utils.createRetryHelper()) {
+ ElasticsearchHttpClient client = new ElasticsearchHttpClient();
+ Method sendRequest = ElasticsearchHttpClient.class.getDeclaredMethod("sendRequest", String.class, HttpMethod.class, PluginTask.class, Jetty92RetryHelper.class, String.class);
+ sendRequest.setAccessible(true);
+ String path = String.format("/%s/%s/_search", ES_INDEX, ES_INDEX_TYPE);
+ String sort = "{\"sort\" : \"id\"}";
+ JsonNode response = (JsonNode) sendRequest.invoke(client, path, HttpMethod.POST, task, retryHelper, sort);
+ assertThat(response.get("hits").get("total").asInt(), is(1));
+ if (response.size() > 0) {
+ JsonNode record = response.get("hits").get("hits").get(0).get("_source");
+ assertThat(record.get("id").asInt(), is(1));
+ assertThat(record.get("account").asInt(), is(32864));
+ assertThat(record.get("time").asText(), is("2015-01-27T19:23:49.000+0000"));
+ assertThat(record.get("purchase").asText(), is("2015-01-27T00:00:00.000+0000"));
+ assertThat(record.get("flg").asBoolean(), is(true));
+ assertThat(record.get("score").asDouble(), is(123.45));
+ assertThat(record.get("comment").asText(), is("embulk"));
}
}
}
@Test
public void testOpenAbort()
{
- ConfigSource config = config();
+ ConfigSource config = utils.config();
Schema schema = config.getNested("parser").loadConfig(CsvParserPlugin.PluginTask.class).getSchemaConfig().toSchema();
PluginTask task = config.loadConfig(PluginTask.class);
TransactionalPageOutput output = plugin.open(task.dump(), schema, 0);
output.abort();
// no error happens.
}
@Test
- public void testCreateClientThrowsException()
- throws GeneralSecurityException, IOException, NoSuchMethodException,
- IllegalAccessException, InvocationTargetException
+ public void testMode()
{
- ConfigSource config = Exec.newConfigSource()
- .set("in", inputConfig())
- .set("parser", parserConfig(schemaConfig()))
- .set("type", "elasticsearch")
- .set("mode", "replace")
- .set("nodes", Arrays.asList(ImmutableMap.of("host", "unknown-host", "port", 9300)))
- .set("cluster_name", ES_CLUSTER_NAME)
- .set("index", ES_INDEX)
- .set("index_type", ES_INDEX_TYPE)
- .set("id", ES_ID)
- .set("bulk_actions", ES_BULK_ACTIONS)
- .set("bulk_size", ES_BULK_SIZE)
- .set("concurrent_requests", ES_CONCURRENT_REQUESTS
- );
- PluginTask task = config.loadConfig(PluginTask.class);
-
- Method createClient = ElasticsearchOutputPlugin.class.getDeclaredMethod("createClient", PluginTask.class);
- createClient.setAccessible(true);
- try (Client client = (Client) createClient.invoke(plugin, task)) {
- } catch (Throwable t) {
- if (t instanceof InvocationTargetException) {
- assertTrue(t.getCause().getCause() instanceof UnknownHostException);
- }
- }
+ assertThat(Mode.values().length, is(2));
+ assertThat(Mode.valueOf("INSERT"), is(Mode.INSERT));
}
@Test
- public void testMode()
+ public void testAuthMethod()
{
- assertEquals(2, ElasticsearchOutputPlugin.Mode.values().length);
- assertEquals(ElasticsearchOutputPlugin.Mode.INSERT, ElasticsearchOutputPlugin.Mode.valueOf("INSERT"));
+ assertThat(AuthMethod.values().length, is(2));
+ assertThat(AuthMethod.valueOf("BASIC"), is(AuthMethod.BASIC));
}
@Test(expected = ConfigException.class)
public void testModeThrowsConfigException()
{
- ElasticsearchOutputPlugin.Mode.fromString("non-exists-mode");
- }
-
- @Test
- public void testDeleteIndex()
- throws GeneralSecurityException, IOException, NoSuchMethodException,
- IllegalAccessException, InvocationTargetException
- {
- ConfigSource config = config();
- PluginTask task = config.loadConfig(PluginTask.class);
-
- Method createClient = ElasticsearchOutputPlugin.class.getDeclaredMethod("createClient", PluginTask.class);
- createClient.setAccessible(true);
- try (Client client = (Client) createClient.invoke(plugin, task)) {
- // Create Index
- client.admin().indices().create(new CreateIndexRequest(ES_TEST_INDEX)).actionGet();
-
- Method deleteIndex = ElasticsearchOutputPlugin.class.getDeclaredMethod("deleteIndex", String.class, Client.class);
- deleteIndex.setAccessible(true);
- deleteIndex.invoke(plugin, ES_TEST_INDEX, client);
-
- assertEquals(false, client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().hasIndex(ES_TEST_INDEX));
- }
- }
-
- @Test
- public void testAlias()
- throws GeneralSecurityException, IOException, NoSuchMethodException,
- IllegalAccessException, InvocationTargetException
- {
- ConfigSource config = config();
- PluginTask task = config.loadConfig(PluginTask.class);
-
- Method createClient = ElasticsearchOutputPlugin.class.getDeclaredMethod("createClient", PluginTask.class);
- createClient.setAccessible(true);
- try (Client client = (Client) createClient.invoke(plugin, task)) {
-
- Method isAlias = ElasticsearchOutputPlugin.class.getDeclaredMethod("isAlias", String.class, Client.class);
- isAlias.setAccessible(true);
-
- Method isExistsAlias = ElasticsearchOutputPlugin.class.getDeclaredMethod("isExistsAlias", String.class, Client.class);
- isExistsAlias.setAccessible(true);
-
- Method getIndexByAlias = ElasticsearchOutputPlugin.class.getDeclaredMethod("getIndexByAlias", String.class, Client.class);
- getIndexByAlias.setAccessible(true);
-
- Method reAssignAlias = ElasticsearchOutputPlugin.class.getDeclaredMethod("reAssignAlias", String.class, String.class, Client.class);
- reAssignAlias.setAccessible(true);
-
- assertEquals(false, isAlias.invoke(plugin, ES_TEST_ALIAS, client));
- assertEquals(false, isExistsAlias.invoke(plugin, ES_TEST_ALIAS, client));
- List<String> indicesBefore = (List<String>) getIndexByAlias.invoke(plugin, ES_TEST_ALIAS, client);
- assertEquals(0, indicesBefore.size());
-
- // Create Index
- client.admin().indices().create(new CreateIndexRequest(ES_TEST_INDEX)).actionGet();
- client.admin().indices().create(new CreateIndexRequest(ES_TEST_INDEX2)).actionGet();
- // Assign Alias
- reAssignAlias.invoke(plugin, ES_TEST_ALIAS, ES_TEST_INDEX, client);
-
- assertEquals(true, isAlias.invoke(plugin, ES_TEST_ALIAS, client));
- assertEquals(true, isExistsAlias.invoke(plugin, ES_TEST_ALIAS, client));
- List<String> indicesAfter = (List<String>) getIndexByAlias.invoke(plugin, ES_TEST_ALIAS, client);
- assertEquals(1, indicesAfter.size());
-
- // ReAssginAlias
- reAssignAlias.invoke(plugin, ES_TEST_ALIAS, ES_TEST_INDEX2, client);
- List<String> indicesReassign = (List<String>) getIndexByAlias.invoke(plugin, ES_TEST_ALIAS, client);
- assertEquals(1, indicesReassign.size());
- }
- }
-
- @Test
- public void testIsExistsIndex()
- throws GeneralSecurityException, IOException, NoSuchMethodException,
- IllegalAccessException, InvocationTargetException
- {
- ConfigSource config = config();
- PluginTask task = config.loadConfig(PluginTask.class);
-
- Method createClient = ElasticsearchOutputPlugin.class.getDeclaredMethod("createClient", PluginTask.class);
- createClient.setAccessible(true);
- try (Client client = (Client) createClient.invoke(plugin, task)) {
- Method isExistsIndex = ElasticsearchOutputPlugin.class.getDeclaredMethod("isExistsIndex", String.class, Client.class);
- isExistsIndex.setAccessible(true);
-
- // Delete index
- if (client.admin().cluster().state(new ClusterStateRequest()).actionGet().getState().getMetaData().hasIndex(ES_TEST_INDEX)) {
- client.admin().indices().delete(new DeleteIndexRequest(ES_TEST_INDEX)).actionGet();
- }
- assertEquals(false, isExistsIndex.invoke(plugin, ES_TEST_INDEX, client));
-
- // Create Index
- client.admin().indices().create(new CreateIndexRequest(ES_TEST_INDEX)).actionGet();
- assertEquals(true, isExistsIndex.invoke(plugin, ES_TEST_INDEX, client));
- }
- }
-
- @Test
- public void testGenerateNewIndex()
- {
- String newIndexName = plugin.generateNewIndexName(ES_INDEX);
- Timestamp time = Exec.getTransactionTime();
- assertEquals(ES_INDEX + new SimpleDateFormat("_yyyyMMdd-HHmmss").format(time.toEpochMilli()), newIndexName);
- }
-
- private byte[] convertInputStreamToByte(InputStream is) throws IOException
- {
- ByteArrayOutputStream bo = new ByteArrayOutputStream();
- byte [] buffer = new byte[1024];
- while(true) {
- int len = is.read(buffer);
- if(len < 0) {
- break;
- }
- bo.write(buffer, 0, len);
- }
- return bo.toByteArray();
- }
-
- private ConfigSource config()
- {
- return Exec.newConfigSource()
- .set("in", inputConfig())
- .set("parser", parserConfig(schemaConfig()))
- .set("type", "elasticsearch")
- .set("mode", "insert")
- .set("nodes", ES_NODES)
- .set("cluster_name", ES_CLUSTER_NAME)
- .set("index", ES_INDEX)
- .set("index_type", ES_INDEX_TYPE)
- .set("id", ES_ID)
- .set("bulk_actions", ES_BULK_ACTIONS)
- .set("bulk_size", ES_BULK_SIZE)
- .set("concurrent_requests", ES_CONCURRENT_REQUESTS);
- }
-
- private ImmutableMap<String, Object> inputConfig()
- {
- ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
- builder.put("type", "file");
- builder.put("path_prefix", PATH_PREFIX);
- builder.put("last_path", "");
- return builder.build();
- }
-
- private ImmutableMap<String, Object> parserConfig(ImmutableList<Object> schemaConfig)
- {
- ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
- builder.put("type", "csv");
- builder.put("newline", "CRLF");
- builder.put("delimiter", ",");
- builder.put("quote", "\"");
- builder.put("escape", "\"");
- builder.put("trim_if_not_quoted", false);
- builder.put("skip_header_lines", 1);
- builder.put("allow_extra_columns", false);
- builder.put("allow_optional_columns", false);
- builder.put("columns", schemaConfig);
- return builder.build();
- }
-
- private ImmutableList<Object> schemaConfig()
- {
- ImmutableList.Builder<Object> builder = new ImmutableList.Builder<>();
- builder.add(ImmutableMap.of("name", "id", "type", "long"));
- builder.add(ImmutableMap.of("name", "account", "type", "long"));
- builder.add(ImmutableMap.of("name", "time", "type", "timestamp", "format", "%Y-%m-%d %H:%M:%S"));
- builder.add(ImmutableMap.of("name", "purchase", "type", "timestamp", "format", "%Y%m%d"));
- builder.add(ImmutableMap.of("name", "flg", "type", "boolean"));
- builder.add(ImmutableMap.of("name", "score", "type", "double"));
- builder.add(ImmutableMap.of("name", "comment", "type", "string"));
- return builder.build();
+ Mode.fromString("non-exists-mode");
}
}