src/test/java/org/embulk/input/filesplit/PartialFileInputStream.java in embulk-output-oracle-0.4.0 vs src/test/java/org/embulk/input/filesplit/PartialFileInputStream.java in embulk-output-oracle-0.4.1

- old
+ new

@@ -1,154 +1,154 @@ -package org.embulk.input.filesplit; - -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.PushbackInputStream; - - -public class PartialFileInputStream extends InputStream -{ - private final PushbackInputStream original; - private long start; - private long end; - private long current; - private boolean eof; - - public PartialFileInputStream(InputStream original, long start, long end) - { - this.original = new PushbackInputStream(new BufferedInputStream(original)); - this.start = start; - this.end = end; - current = -1; - } - - @Override - public int read(byte[] b) throws IOException - { - return read(b, 0, b.length); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException - { - initializeIfNeeded(); - - if (eof) { - return -1; - } - - int read = original.read(b, off, len); - if (read < 0) { - eof = true; - return -1; - } - - current += read; - if (current >= end) { - for (int i = Math.max((int)(end - 1 - current + read), 0); i < read; i++) { - if (b[off + i] == '\n') { - eof = true; - return i + 1; - } - - if (b[off + i] == '\r') { - int next = (i < read ? b[off + i + 1] : prefetch()); - if (next != '\n') { - eof = true; - return i + 1; - } - } - } - } - - return read; - } - - @Override - public int read() throws IOException - { - initializeIfNeeded(); - - if (eof) { - return -1; - } - - int read = original.read(); - current++; - - if (read < 0) { - eof = true; - return -1; - } - - if (current >= end) { - if (read == '\n' || read == '\r' && prefetch() != '\n') { - eof = true; - } - } - - return read; - } - - @Override - public long skip(long n) throws IOException - { - throw new IOException("Skip not supported."); - /* - long skip = original.skip(n); - current += skip; - return skip; - */ - } - - @Override - public int available() throws IOException - { - return 0; - } - - @Override - public void close() throws IOException - { - original.close(); - } - - private void initializeIfNeeded() throws IOException - { - if (current >= start) { - return; - - } - if (start == 0) { - current = 0; - } else { - current = original.skip(--start); - if (current != start) { - throw new IOException("Cannot skip."); - } - - int c; - while ((c = original.read()) >= 0) { - start++; - current++; - - if (c == '\n' || c == '\r' && prefetch() != '\n') { - break; - } - } - } - - if (start >= end) { - eof = true; - } - } - - private int prefetch() throws IOException - { - int c = original.read(); - if (c >= 0) { - original.unread(c); - } - return c; - } -} +package org.embulk.input.filesplit; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PushbackInputStream; + + +public class PartialFileInputStream extends InputStream +{ + private final PushbackInputStream original; + private long start; + private long end; + private long current; + private boolean eof; + + public PartialFileInputStream(InputStream original, long start, long end) + { + this.original = new PushbackInputStream(new BufferedInputStream(original)); + this.start = start; + this.end = end; + current = -1; + } + + @Override + public int read(byte[] b) throws IOException + { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + initializeIfNeeded(); + + if (eof) { + return -1; + } + + int read = original.read(b, off, len); + if (read < 0) { + eof = true; + return -1; + } + + current += read; + if (current >= end) { + for (int i = Math.max((int)(end - 1 - current + read), 0); i < read; i++) { + if (b[off + i] == '\n') { + eof = true; + return i + 1; + } + + if (b[off + i] == '\r') { + int next = (i < read ? b[off + i + 1] : prefetch()); + if (next != '\n') { + eof = true; + return i + 1; + } + } + } + } + + return read; + } + + @Override + public int read() throws IOException + { + initializeIfNeeded(); + + if (eof) { + return -1; + } + + int read = original.read(); + current++; + + if (read < 0) { + eof = true; + return -1; + } + + if (current >= end) { + if (read == '\n' || read == '\r' && prefetch() != '\n') { + eof = true; + } + } + + return read; + } + + @Override + public long skip(long n) throws IOException + { + throw new IOException("Skip not supported."); + /* + long skip = original.skip(n); + current += skip; + return skip; + */ + } + + @Override + public int available() throws IOException + { + return 0; + } + + @Override + public void close() throws IOException + { + original.close(); + } + + private void initializeIfNeeded() throws IOException + { + if (current >= start) { + return; + + } + if (start == 0) { + current = 0; + } else { + current = original.skip(--start); + if (current != start) { + throw new IOException("Cannot skip."); + } + + int c; + while ((c = original.read()) >= 0) { + start++; + current++; + + if (c == '\n' || c == '\r' && prefetch() != '\n') { + break; + } + } + } + + if (start >= end) { + eof = true; + } + } + + private int prefetch() throws IOException + { + int c = original.read(); + if (c >= 0) { + original.unread(c); + } + return c; + } +}