src/main/java/org/embulk/decoder/unzip/UnzipDecoderPlugin.java in embulk-decoder-unzip-0.1.0 vs src/main/java/org/embulk/decoder/unzip/UnzipDecoderPlugin.java in embulk-decoder-unzip-0.1.1
- old
+ new
@@ -1,12 +1,10 @@
package org.embulk.decoder.unzip;
import java.io.InputStream;
import java.io.IOException;
-import com.google.common.base.Optional;
-
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigInject;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
@@ -21,24 +19,14 @@
implements DecoderPlugin
{
public interface PluginTask
extends Task
{
- // configuration option 1 (required integer)
- @Config("option1")
- public int getOption1();
+// @Config("skip_on_error")
+// @ConfigDefault("true")
+// public boolean skipOnError();
- // configuration option 2 (optional string, null is not allowed)
- @Config("option2")
- @ConfigDefault("\"myvalue\"")
- public String getOption2();
-
- // configuration option 3 (optional string, null is allowed)
- @Config("option3")
- @ConfigDefault("null")
- public Optional<String> getOption3();
-
@ConfigInject
public BufferAllocator getBufferAllocator();
}
@Override
@@ -52,35 +40,40 @@
@Override
public FileInput open(TaskSource taskSource, FileInput fileInput)
{
final PluginTask task = taskSource.loadTask(PluginTask.class);
- // Write your code here :)
- throw new UnsupportedOperationException("UnzipDecoderPlugin.open method is not implemented yet");
+ final FileInputInputStream files = new FileInputInputStream(fileInput);
- // If expect InputStream, you can use this code:
-
- //final FileInputInputStream files = new FileInputInputStream(fileInput);
- //
- //return new InputStreamFileInput(
- // task.getBufferAllocator(),
- // new InputStreamFileInput.Provider() {
- // public InputStream openNext() throws IOException
- // {
- // if (!files.nextFile()) {
- // return null;
- // }
- // return newDecoderInputStream(task, files);
- // }
- //
- // public void close() throws IOException
- // {
- // files.close();
- // }
- // });
+ InputStreamFileInput isfi = null;
+ try {
+ isfi = new InputStreamFileInput(
+ task.getBufferAllocator(),
+ new InputStreamFileInput.Provider() {
+ public InputStream openNext() throws IOException
+ {
+ if (!files.nextFile()) {
+ return null;
+ }
+ return newDecoderInputStream(task, files);
+ }
+
+ public void close() throws IOException
+ {
+ files.close();
+ }
+ });
+ } catch (Exception e) {
+// if(task.skipOnError()) {
+// System.out.println("skip: " + isfi.hintOfCurrentInputFileNameForLogging());
+// return null;
+// } else
+ throw new RuntimeException(e);
+ }
+ return isfi;
}
- //private static InputStream newDecoderInputStream(PluginTask task, InputStream file) throws IOException
- //{
- // return new MyInputStream(file);
- //}
+ private static InputStream newDecoderInputStream(PluginTask task, InputStream file) throws IOException
+ {
+ return new UnzipInputStream(file);
+ }
}