src/main/java/org/embulk/input/s3/FileList.java in embulk-input-s3-0.2.6 vs src/main/java/org/embulk/input/s3/FileList.java in embulk-input-s3-0.2.7
- old
+ new
@@ -36,10 +36,15 @@
String getPathMatchPattern();
@Config("total_file_count_limit")
@ConfigDefault("2147483647")
int getTotalFileCountLimit();
+
+ // TODO support more algorithms to combine tasks
+ @Config("min_task_size")
+ @ConfigDefault("0")
+ long getMinTaskSize();
}
public static class Entry
{
private int index;
@@ -67,26 +72,29 @@
private final OutputStream stream;
private final List<Entry> entries = new ArrayList<>();
private String last = null;
private int limitCount = Integer.MAX_VALUE;
+ private long minTaskSize = 1;
private Pattern pathMatchPattern;
private final ByteBuffer castBuffer = ByteBuffer.allocate(4);
public Builder(Task task)
{
this();
- this.limitCount = task.getTotalFileCountLimit();
this.pathMatchPattern = Pattern.compile(task.getPathMatchPattern());
+ this.limitCount = task.getTotalFileCountLimit();
+ this.minTaskSize = task.getMinTaskSize();
}
public Builder(ConfigSource config)
{
this();
this.pathMatchPattern = Pattern.compile(config.get(String.class, "path_match_pattern", ".*"));
this.limitCount = config.get(int.class, "total_file_count_limit", Integer.MAX_VALUE);
+ this.minTaskSize = config.get(long.class, "min_task_size", 0L);
}
public Builder()
{
binary = new ByteArrayOutputStream();
@@ -102,10 +110,16 @@
{
this.limitCount = limitCount;
return this;
}
+ public Builder minTaskSize(long bytes)
+ {
+ this.minTaskSize = bytes;
+ return this;
+ }
+
public Builder pathMatchPattern(String pattern)
{
this.pathMatchPattern = Pattern.compile(pattern);
return this;
}
@@ -161,13 +175,23 @@
return new FileList(binary.toByteArray(), getSplits(entries), Optional.fromNullable(last));
}
private List<List<Entry>> getSplits(List<Entry> all)
{
- // TODO combine multiple entries into one task using some configuration parameters
List<List<Entry>> tasks = new ArrayList<>();
+ long currentTaskSize = 0;
+ List<Entry> currentTask = new ArrayList<>();
for (Entry entry : all) {
- tasks.add(ImmutableList.of(entry));
+ currentTask.add(entry);
+ currentTaskSize += entry.getSize(); // TODO consider to multiply the size by cost_per_byte, and add cost_per_file
+ if (currentTaskSize >= minTaskSize) {
+ tasks.add(currentTask);
+ currentTask = new ArrayList<>();
+ currentTaskSize = 0;
+ }
+ }
+ if (!currentTask.isEmpty()) {
+ tasks.add(currentTask);
}
return tasks;
}
}