package org.msgpack.jruby; import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.RubyString; import org.jruby.RubyObject; import org.jruby.RubyHash; import org.jruby.RubyNumeric; import org.jruby.RubyIO; import org.jruby.exceptions.RaiseException; import org.jruby.runtime.builtin.IRubyObject; import org.jruby.runtime.Block; import org.jruby.runtime.ObjectAllocator; import org.jruby.runtime.ThreadContext; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.util.ByteList; import org.jruby.ext.stringio.StringIO; import static org.jruby.runtime.Visibility.PRIVATE; @JRubyClass(name="MessagePack::Unpacker") public class Unpacker extends RubyObject { private IRubyObject stream; private IRubyObject data; private Decoder decoder; private final RubyClass underflowErrorClass; public Unpacker(Ruby runtime, RubyClass type) { super(runtime, type); this.underflowErrorClass = runtime.getModule("MessagePack").getClass("UnderflowError"); } static class UnpackerAllocator implements ObjectAllocator { public IRubyObject allocate(Ruby runtime, RubyClass klass) { return new Unpacker(runtime, klass); } } @JRubyMethod(name = "initialize", optional = 1, visibility = PRIVATE) public IRubyObject initialize(ThreadContext ctx, IRubyObject[] args) { if (args.length > 0) { if (args[args.length - 1] instanceof RubyHash) { //TODO: symbolize_keys } else if (!(args[0] instanceof RubyHash)) { setStream(ctx, args[0]); } } return this; } @JRubyMethod(required = 2) public IRubyObject execute(ThreadContext ctx, IRubyObject data, IRubyObject offset) { return executeLimit(ctx, data, offset, null); } @JRubyMethod(name = "execute_limit", required = 3) public IRubyObject executeLimit(ThreadContext ctx, IRubyObject str, IRubyObject off, IRubyObject lim) { RubyString input = str.asString(); int offset = RubyNumeric.fix2int(off); int limit = lim == null || lim.isNil() ? -1 : RubyNumeric.fix2int(lim); ByteList byteList = input.getByteList(); if (limit == -1) { limit = byteList.length() - offset; } Decoder decoder = new Decoder(ctx.getRuntime(), byteList.unsafeBytes(), byteList.begin() + offset, limit); try { this.data = null; this.data = decoder.next(); } catch (RaiseException re) { if (re.getException().getType() != underflowErrorClass) { throw re; } } return ctx.getRuntime().newFixnum(decoder.offset()); } @JRubyMethod(name = "data") public IRubyObject getData(ThreadContext ctx) { if (data == null) { return ctx.getRuntime().getNil(); } else { return data; } } @JRubyMethod(name = "finished?") public IRubyObject finished_p(ThreadContext ctx) { return data == null ? ctx.getRuntime().getFalse() : ctx.getRuntime().getTrue(); } @JRubyMethod(required = 1) public IRubyObject feed(ThreadContext ctx, IRubyObject data) { ByteList byteList = data.asString().getByteList(); if (decoder == null) { decoder = new Decoder(ctx.getRuntime(), byteList.unsafeBytes(), byteList.begin(), byteList.length()); } else { decoder.feed(byteList.unsafeBytes(), byteList.begin(), byteList.length()); } return ctx.getRuntime().getNil(); } @JRubyMethod(name = "feed_each", required = 1) public IRubyObject feedEach(ThreadContext ctx, IRubyObject data, Block block) { feed(ctx, data); each(ctx, block); return ctx.getRuntime().getNil(); } @JRubyMethod public IRubyObject each(ThreadContext ctx, Block block) { if (block.isGiven()) { if (decoder != null) { try { while (decoder.hasNext()) { block.yield(ctx, decoder.next()); } } catch (RaiseException re) { if (re.getException().getType() != underflowErrorClass) { throw re; } } } return this; } else { return callMethod(ctx, "to_enum"); } } @JRubyMethod public IRubyObject fill(ThreadContext ctx) { return ctx.getRuntime().getNil(); } @JRubyMethod public IRubyObject reset(ThreadContext ctx) { if (decoder != null) { decoder.reset(); } return ctx.getRuntime().getNil(); } @JRubyMethod(name = "stream") public IRubyObject getStream(ThreadContext ctx) { if (stream == null) { return ctx.getRuntime().getNil(); } else { return stream; } } @JRubyMethod(name = "stream=", required = 1) public IRubyObject setStream(ThreadContext ctx, IRubyObject stream) { RubyString str; if (stream instanceof StringIO) { str = stream.callMethod(ctx, "string").asString(); } else if (stream instanceof RubyIO) { str = stream.callMethod(ctx, "read").asString(); } else { throw ctx.getRuntime().newTypeError(stream, "IO"); } ByteList byteList = str.getByteList(); this.stream = stream; this.decoder = null; this.decoder = new Decoder(ctx.getRuntime(), byteList.unsafeBytes(), byteList.begin(), byteList.length()); return getStream(ctx); } }