src/main/java/org/embulk/input/s3/FileList.java in embulk-input-s3-0.2.6 vs src/main/java/org/embulk/input/s3/FileList.java in embulk-input-s3-0.2.7

- old
+ new

@@ -36,10 +36,15 @@ String getPathMatchPattern(); @Config("total_file_count_limit") @ConfigDefault("2147483647") int getTotalFileCountLimit(); + + // TODO support more algorithms to combine tasks + @Config("min_task_size") + @ConfigDefault("0") + long getMinTaskSize(); } public static class Entry { private int index; @@ -67,26 +72,29 @@ private final OutputStream stream; private final List<Entry> entries = new ArrayList<>(); private String last = null; private int limitCount = Integer.MAX_VALUE; + private long minTaskSize = 1; private Pattern pathMatchPattern; private final ByteBuffer castBuffer = ByteBuffer.allocate(4); public Builder(Task task) { this(); - this.limitCount = task.getTotalFileCountLimit(); this.pathMatchPattern = Pattern.compile(task.getPathMatchPattern()); + this.limitCount = task.getTotalFileCountLimit(); + this.minTaskSize = task.getMinTaskSize(); } public Builder(ConfigSource config) { this(); this.pathMatchPattern = Pattern.compile(config.get(String.class, "path_match_pattern", ".*")); this.limitCount = config.get(int.class, "total_file_count_limit", Integer.MAX_VALUE); + this.minTaskSize = config.get(long.class, "min_task_size", 0L); } public Builder() { binary = new ByteArrayOutputStream(); @@ -102,10 +110,16 @@ { this.limitCount = limitCount; return this; } + public Builder minTaskSize(long bytes) + { + this.minTaskSize = bytes; + return this; + } + public Builder pathMatchPattern(String pattern) { this.pathMatchPattern = Pattern.compile(pattern); return this; } @@ -161,13 +175,23 @@ return new FileList(binary.toByteArray(), getSplits(entries), Optional.fromNullable(last)); } private List<List<Entry>> getSplits(List<Entry> all) { - // TODO combine multiple entries into one task using some configuration parameters List<List<Entry>> tasks = new ArrayList<>(); + long currentTaskSize = 0; + List<Entry> currentTask = new ArrayList<>(); for (Entry entry : all) { - tasks.add(ImmutableList.of(entry)); + currentTask.add(entry); + currentTaskSize += entry.getSize(); // TODO consider to multiply the size by cost_per_byte, and add cost_per_file + if (currentTaskSize >= minTaskSize) { + tasks.add(currentTask); + currentTask = new ArrayList<>(); + currentTaskSize = 0; + } + } + if (!currentTask.isEmpty()) { + tasks.add(currentTask); } return tasks; } }