ext/extzstd_stream.c in extzstd-0.1.1 vs ext/extzstd_stream.c in extzstd-0.2

- old
+ new

@@ -1,178 +1,252 @@ #include "extzstd.h" #include "extzstd_nogvls.h" #include <errno.h> +enum { + EXT_PARTIAL_READ_SIZE = 256 * 1024, /* 256 KiB */ + EXT_READ_GROWUP_SIZE = 256 * 1024, /* 256 KiB */ + EXT_READ_DOUBLE_GROWUP_LIMIT_SIZE = 4 * 1024 * 1024, /* 4 MiB */ +}; + +static inline VALUE +aux_str_buf_recycle(VALUE str, size_t capacity) +{ + if (!RTEST(str) || rb_obj_frozen_p(str) || !rb_type_p(str, RUBY_T_STRING)) { + return rb_str_buf_new(capacity); + } else { + return aux_str_modify_expand(str, capacity); + } +} + +static ID id_op_lsh, id_read; + /* - * class Zstd::StreamEncoder + * class Zstd::Encoder */ static VALUE cStreamEncoder; +struct encoder +{ + ZSTD_CStream *context; + VALUE outport; + VALUE predict; + VALUE destbuf; + int reached_eof; +}; + static void -enc_free(void *p) +enc_gc_mark(void *pp) { - ZSTD_freeCStream(p); + if (pp) { + struct encoder *p = (struct encoder *)pp; + rb_gc_mark(p->outport); + rb_gc_mark(p->predict); + rb_gc_mark(p->destbuf); + } } +static void +enc_free(void *pp) +{ + if (pp) { + struct encoder *p = (struct encoder *)pp; + if (p->context) { + ZSTD_freeCStream(p->context); + p->context = NULL; + } + xfree(p); + } +} + AUX_IMPLEMENT_CONTEXT( - ZSTD_CStream, encoder_type, "extzstd.Zstd::StreamEncoder", - encoder_alloc_dummy, NULL, enc_free, NULL, + struct encoder, encoder_type, "extzstd.Zstd::Encoder", + encoder_alloc_dummy, enc_gc_mark, enc_free, NULL, getencoderp, getencoder, encoder_p); static VALUE enc_alloc(VALUE mod) { - VALUE obj = TypedData_Wrap_Struct(mod, &encoder_type, NULL); + struct encoder *p; + VALUE obj = TypedData_Make_Struct(mod, struct encoder, &encoder_type, p); + p->outport = Qnil; + p->predict = Qnil; + p->destbuf = Qnil; + return obj; +} - ZSTD_CStream *p = ZSTD_createCStream(); - if (!p) { - rb_gc(); - p = ZSTD_createCStream(); - if (!p) { - errno = ENOMEM; - rb_sys_fail("failed ZSTD_createCStream()"); - } +static struct encoder * +encoder_context(VALUE self) +{ + struct encoder *p = getencoder(self); + if (!p->context) { + rb_raise(rb_eTypeError, + "wrong initialized context - #<%s:%p>", + rb_obj_classname(self), (void *)self); } - DATA_PTR(obj) = p; - - return obj; + return p; } /* * call-seq: - * initialize(compression_parameters, predict) + * initialize(outport, compression_parameters = nil, predict = nil) */ static VALUE -enc_init(VALUE self, VALUE params, VALUE predict) +enc_init(int argc, VALUE argv[], VALUE self) { /* * ZSTDLIB_API size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel); * ZSTDLIB_API size_t ZSTD_initCStream_usingDict(ZSTD_CStream* zcs, const void* dict, size_t dictSize, int compressionLevel); * ZSTDLIB_API size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, const void* dict, size_t dictSize, * ZSTD_parameters params, unsigned long long pledgedSrcSize); */ - ZSTD_CStream *p = getencoder(self); + VALUE outport, params, predict; + switch (argc) { + case 1: + outport = argv[0]; + params = predict = Qnil; + break; + case 2: + outport = argv[0]; + params = argv[1]; + predict = Qnil; + break; + case 3: + outport = argv[0]; + params = argv[1]; + predict = argv[2]; + break; + default: + rb_error_arity(argc, 1, 3); + } + + struct encoder *p = getencoder(self); + if (p->context) { + rb_raise(rb_eTypeError, + "initialized already - #<%s:%p>", + rb_obj_classname(self), (void *)self); + } + + AUX_TRY_WITH_GC( + p->context = ZSTD_createCStream(), + "failed ZSTD_createCStream()"); + const void *predictp; size_t predictsize; if (NIL_P(predict)) { predictp = NULL; predictsize = 0; } else { rb_check_type(predict, RUBY_T_STRING); + predict = rb_str_new_frozen(predict); RSTRING_GETMEM(predict, predictp, predictsize); } if (extzstd_params_p(params)) { ZSTD_parameters *paramsp = extzstd_getparams(params); - size_t s = ZSTD_initCStream_advanced(p, predictp, predictsize, *paramsp, 0); + size_t s = ZSTD_initCStream_advanced(p->context, predictp, predictsize, *paramsp, 0); extzstd_check_error(s); } else { - size_t s = ZSTD_initCStream_usingDict(p, predictp, predictsize, aux_num2int(params, 1)); + size_t s = ZSTD_initCStream_usingDict(p->context, predictp, predictsize, aux_num2int(params, 1)); extzstd_check_error(s); } + + p->predict = predict; + p->outport = outport; + return self; } static VALUE -enc_update(VALUE self, VALUE src, VALUE srcoff, VALUE dest, VALUE maxdest) +enc_write(VALUE self, VALUE src) { /* * ZSTDLIB_API size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input); */ - ZSTD_CStream *p = getencoder(self); - + struct encoder *p = encoder_context(self); rb_check_type(src, RUBY_T_STRING); - const char *q = RSTRING_PTR(src); - const char *qq = RSTRING_END(src); - const char *q1 = q + NUM2SIZET(srcoff); - if (q1 < q || q1 > qq) { - rb_raise(rb_eArgError, - "``srcoff'' is out of range (given %"PRIuSIZE", expect 0..%"PRIuSIZE")", - q1 - q, qq - q); - } + ZSTD_inBuffer input = { RSTRING_PTR(src), RSTRING_LEN(src), 0 }; - rb_check_type(dest, RUBY_T_STRING); - rb_str_modify(dest); - rb_str_set_len(dest, 0); - rb_str_modify_expand(dest, NUM2SIZET(maxdest)); - rb_obj_infect(self, src); - rb_obj_infect(dest, self); - char *r = RSTRING_PTR(dest); - const char *rr = r + rb_str_capacity(dest); + while (input.pos < input.size) { + p->destbuf = aux_str_buf_recycle(p->destbuf, ZSTD_CStreamOutSize() * 2); + rb_str_set_len(p->destbuf, 0); + rb_obj_infect(self, src); + rb_obj_infect(p->destbuf, self); + ZSTD_outBuffer output = { RSTRING_PTR(p->destbuf), rb_str_capacity(p->destbuf), 0 }; + size_t s = ZSTD_compressStream(p->context, &output, &input); + extzstd_check_error(s); + rb_str_set_len(p->destbuf, output.pos); - ZSTD_inBuffer input = { q, qq - q, q1 - q }; - ZSTD_outBuffer output = { r, rr - r, 0 }; - size_t s = ZSTD_compressStream(p, &output, &input); - extzstd_check_error(s); - rb_str_set_len(dest, output.pos); - if (input.pos == input.size) { - return Qnil; - } else { - return SIZET2NUM(input.pos); + // TODO: 例外や帯域脱出した場合の挙動は? + // TODO: src の途中経過状態を保存するべきか? + AUX_FUNCALL(p->outport, id_op_lsh, p->destbuf); } + + return self; } static VALUE -enc_flush(VALUE self, VALUE dest, VALUE maxdest) +enc_sync(VALUE self) { /* * ZSTDLIB_API size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output); */ - ZSTD_CStream *p = getencoder(self); + struct encoder *p = encoder_context(self); + aux_str_buf_recycle(p->destbuf, ZSTD_CStreamOutSize()); + rb_str_set_len(p->destbuf, 0); + rb_obj_infect(p->destbuf, self); + ZSTD_outBuffer output = { RSTRING_PTR(p->destbuf), rb_str_capacity(p->destbuf), 0 }; + size_t s = ZSTD_flushStream(p->context, &output); + extzstd_check_error(s); + rb_str_set_len(p->destbuf, output.pos); - rb_check_type(dest, RUBY_T_STRING); - rb_str_modify(dest); - rb_str_set_len(dest, 0); - rb_str_modify_expand(dest, NUM2SIZET(maxdest)); + AUX_FUNCALL(p->outport, id_op_lsh, p->destbuf); - ZSTD_outBuffer output = { RSTRING_PTR(dest), rb_str_capacity(dest), 0 }; - size_t s = ZSTD_flushStream(p, &output); - extzstd_check_error(s); - if (output.size > 0) { - rb_str_set_len(dest, output.pos); - return dest; - } else { - return Qnil; - } + return self; } static VALUE -enc_end(VALUE self, VALUE dest, VALUE maxdest) +enc_close(VALUE self) { /* * ZSTDLIB_API size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output); */ - ZSTD_CStream *p = getencoder(self); - rb_check_type(dest, RUBY_T_STRING); - rb_str_modify(dest); - rb_str_set_len(dest, 0); - rb_str_modify_expand(dest, NUM2SIZET(maxdest)); - - ZSTD_outBuffer output = { RSTRING_PTR(dest), rb_str_capacity(dest), 0 }; - size_t s = ZSTD_endStream(p, &output); + struct encoder *p = encoder_context(self); + aux_str_buf_recycle(p->destbuf, ZSTD_CStreamOutSize()); + rb_str_set_len(p->destbuf, 0); + rb_obj_infect(p->destbuf, self); + ZSTD_outBuffer output = { RSTRING_PTR(p->destbuf), rb_str_capacity(p->destbuf), 0 }; + size_t s = ZSTD_endStream(p->context, &output); extzstd_check_error(s); - if (output.size > 0) { - rb_str_set_len(dest, output.pos); - return dest; - } else { - return Qnil; - } + rb_str_set_len(p->destbuf, output.pos); + + AUX_FUNCALL(p->outport, id_op_lsh, p->destbuf); + + p->reached_eof = 1; + + return Qnil; } static VALUE +enc_eof(VALUE self) +{ + return (encoder_context(self)->reached_eof == 0 ? Qfalse : Qtrue); +} + +static VALUE enc_reset(VALUE self, VALUE pledged_srcsize) { /* * ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize); */ - size_t s = ZSTD_resetCStream(getencoder(self), NUM2ULL(pledged_srcsize)); + size_t s = ZSTD_resetCStream(encoder_context(self)->context, NUM2ULL(pledged_srcsize)); extzstd_check_error(s); return self; } static VALUE @@ -180,134 +254,314 @@ { /* * ZSTDLIB_API size_t ZSTD_sizeof_CStream(const ZSTD_CStream* zcs); */ - size_t s = ZSTD_sizeof_CStream(getencoder(self)); + size_t s = ZSTD_sizeof_CStream(encoder_context(self)->context); extzstd_check_error(s); return SIZET2NUM(s); } static void init_encoder(void) { - cStreamEncoder = rb_define_class_under(extzstd_mZstd, "StreamEncoder", rb_cObject); + cStreamEncoder = rb_define_class_under(extzstd_mZstd, "Encoder", rb_cObject); rb_define_alloc_func(cStreamEncoder, enc_alloc); rb_define_const(cStreamEncoder, "INSIZE", SIZET2NUM(ZSTD_CStreamInSize())); rb_define_const(cStreamEncoder, "OUTSIZE", SIZET2NUM(ZSTD_CStreamOutSize())); - rb_define_method(cStreamEncoder, "initialize", enc_init, 2); - rb_define_method(cStreamEncoder, "update", enc_update, 4); - rb_define_method(cStreamEncoder, "flush", enc_flush, 2); - rb_define_method(cStreamEncoder, "end", enc_end, 2); + rb_define_method(cStreamEncoder, "initialize", enc_init, -1); + rb_define_method(cStreamEncoder, "write", enc_write, 1); + rb_define_method(cStreamEncoder, "sync", enc_sync, 0); + rb_define_method(cStreamEncoder, "close", enc_close, 0); + rb_define_method(cStreamEncoder, "eof", enc_eof, 0); + rb_define_alias(cStreamEncoder, "eof?", "eof"); rb_define_method(cStreamEncoder, "reset", enc_reset, 1); rb_define_method(cStreamEncoder, "sizeof", enc_sizeof, 0); + rb_define_alias(cStreamEncoder, "<<", "write"); + rb_define_alias(cStreamEncoder, "update", "write"); + rb_define_alias(cStreamEncoder, "flush", "sync"); + rb_define_alias(cStreamEncoder, "end", "close"); + rb_define_alias(cStreamEncoder, "finish", "close"); } /* - * class Zstd::StreamDecoder + * class Zstd::Decoder */ static VALUE cStreamDecoder; +struct decoder +{ + ZSTD_DStream *context; + VALUE inport; + VALUE readbuf; + VALUE predict; + ZSTD_inBuffer inbuf; + int reached_eof; +}; + static void -dec_free(void *p) +dec_mark(void *pp) { - ZSTD_freeDStream(p); + struct decoder *p = (struct decoder *)pp; + rb_gc_mark(p->inport); + rb_gc_mark(p->readbuf); + rb_gc_mark(p->predict); } +static void +dec_free(void *pp) +{ + struct decoder *p = (struct decoder *)pp; + if (p->context) { + ZSTD_freeDStream(p->context); + p->context = NULL; + } + xfree(p); +} + AUX_IMPLEMENT_CONTEXT( - ZSTD_DStream, decoder_type, "extzstd.Zstd::StreamDecoder", - decoder_alloc_dummy, NULL, dec_free, NULL, + struct decoder, decoder_type, "extzstd.Zstd::Decoder", + decoder_alloc_dummy, dec_mark, dec_free, NULL, getdecoderp, getdecoder, decoder_p); -static VALUE -dec_alloc(VALUE mod) +static struct decoder * +decoder_context(VALUE self) { - VALUE obj = TypedData_Wrap_Struct(mod, &decoder_type, NULL); - - ZSTD_DStream *p = ZSTD_createDStream(); - if (!p) { - rb_gc(); - p = ZSTD_createDStream(); - if (!p) { - errno = ENOMEM; - rb_sys_fail("failed ZSTD_createDStream()"); - } + struct decoder *p = getdecoder(self); + if (!p->context) { + rb_raise(rb_eTypeError, + "uninitialized context - #<%s:%p>", + rb_obj_classname(self), (void *)self); } - DATA_PTR(obj) = p; + return p; +} +static VALUE +dec_alloc(VALUE mod) +{ + struct decoder *p; + VALUE obj = TypedData_Make_Struct(mod, struct decoder, &decoder_type, p); return obj; } /* * call-seq: - * initialize(predict) + * initialize(inport, predict = Qnil) */ static VALUE -dec_init(VALUE self, VALUE predict) +dec_init(int argc, VALUE argv[], VALUE self) { /* * ZSTDLIB_API size_t ZSTD_initDStream(ZSTD_DStream* zds); * ZSTDLIB_API size_t ZSTD_initDStream_usingDict(ZSTD_DStream* zds, const void* dict, size_t dictSize); */ - ZSTD_DStream *p = getdecoder(self); + VALUE inport, predict; + switch (argc) { + case 1: + inport = argv[0]; + predict = Qnil; + break; + case 2: + inport = argv[0]; + predict = argv[1]; + break; + default: + rb_error_arity(argc, 1, 2); + } + + struct decoder *p = getdecoder(self); + if (p->context) { + rb_raise(rb_eTypeError, + "initialized context already - #<%s:%p>", + rb_obj_classname(self), (void *)self); + } + + AUX_TRY_WITH_GC( + p->context = ZSTD_createDStream(), + "failed ZSTD_createDStream()"); + if (NIL_P(predict)) { - size_t s = ZSTD_initDStream(p); + size_t s = ZSTD_initDStream(p->context); extzstd_check_error(s); } else { rb_check_type(predict, RUBY_T_STRING); - size_t s = ZSTD_initDStream_usingDict(p, RSTRING_PTR(predict), RSTRING_LEN(predict)); + predict = rb_str_new_frozen(predict); + size_t s = ZSTD_initDStream_usingDict(p->context, RSTRING_PTR(predict), RSTRING_LEN(predict)); extzstd_check_error(s); } + + p->inport = inport; + p->predict = predict; + return self; } +static int +dec_read_fetch(VALUE o, struct decoder *p) +{ + if (!p->inbuf.src || NIL_P(p->readbuf) || p->inbuf.pos >= RSTRING_LEN(p->readbuf)) { + p->readbuf = aux_str_buf_recycle(p->readbuf, EXT_PARTIAL_READ_SIZE); + VALUE st = AUX_FUNCALL(p->inport, id_read, INT2FIX(EXT_PARTIAL_READ_SIZE), p->readbuf); + if (NIL_P(st)) { return -1; } + rb_check_type(st, RUBY_T_STRING); + p->readbuf = st; + rb_obj_infect(o, p->readbuf); + p->inbuf.size = RSTRING_LEN(p->readbuf); + p->inbuf.pos = 0; + } + + p->inbuf.src = RSTRING_PTR(p->readbuf); + + return 0; +} + +static size_t +dec_read_decode(VALUE o, struct decoder *p, char *buf, ssize_t size) +{ + if (p->reached_eof != 0) { + return 0; + } + + ZSTD_outBuffer output = { buf, size, 0 }; + + while (size < 0 || output.pos < size) { + if (dec_read_fetch(o, p) != 0) { + if (p->reached_eof == 0) { + rb_raise(rb_eRuntimeError, + "unexpected EOF - #<%s:%p>", + rb_obj_classname(p->inport), (void *)p->inport); + } + + break; + } + + rb_thread_check_ints(); + size_t s = ZSTD_decompressStream(p->context, &output, &p->inbuf); + extzstd_check_error(s); + if (s == 0) { + p->reached_eof = 1; + break; + } + } + + return output.pos; +} + +static void +dec_read_args(int argc, VALUE argv[], VALUE self, VALUE *buf, ssize_t *size) +{ + switch (argc) { + case 0: + *size = -1; + *buf = rb_str_buf_new(EXT_READ_GROWUP_SIZE); + break; + case 1: + case 2: + { + if (NIL_P(argv[0])) { + *size = -1; + + if (argc == 1) { + *buf = rb_str_buf_new(EXT_READ_GROWUP_SIZE); + } else { + rb_check_type(argv[1], RUBY_T_STRING); + *buf = aux_str_modify_expand(argv[1], EXT_READ_GROWUP_SIZE); + rb_str_set_len(*buf, 0); + } + } else { + *size = NUM2SIZET(argv[0]); + + if (*size < 0) { + rb_raise(rb_eArgError, + "``size'' is negative or too large (%"PRIdPTR")", + (intptr_t)*size); + } + + if (argc == 1) { + *buf = rb_str_buf_new(*size); + } else { + rb_check_type(argv[1], RUBY_T_STRING); + *buf = aux_str_modify_expand(argv[1], *size); + rb_str_set_len(*buf, 0); + } + } + } + break; + default: + rb_error_arity(argc, 0, 2); + } +} + +/* + * call-seq: + * read -> read_data + * read(readsize, buf = "".b) -> buf + */ static VALUE -dec_update(VALUE self, VALUE src, VALUE srcoff, VALUE dest, VALUE maxdest) +dec_read(int argc, VALUE argv[], VALUE self) { /* * ZSTDLIB_API size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inBuffer* input); */ - ZSTD_DStream *p = getdecoder(self); + ssize_t size; + VALUE buf; + dec_read_args(argc, argv, self, &buf, &size); - rb_check_type(src, RUBY_T_STRING); - const char *q = RSTRING_PTR(src); - const char *qq = RSTRING_END(src); - const char *q1 = q + NUM2SIZET(srcoff); - if (q1 < q || q1 > qq) { - rb_raise(rb_eArgError, - "``srcoff'' is out of range (given %"PRIuSIZE", expect 0..%"PRIuSIZE")", - q1 - q, qq - q); + struct decoder *p = decoder_context(self); + + if (size == 0) { + rb_str_set_len(buf, 0); + return buf; + } else if (size > 0) { + size = dec_read_decode(self, p, RSTRING_PTR(buf), size); + rb_str_set_len(buf, size); + } else { + /* if (size < 0) */ + + size_t capa = EXT_READ_GROWUP_SIZE; + + for (;;) { + aux_str_modify_expand(buf, capa); + size = dec_read_decode(self, p, RSTRING_PTR(buf) + RSTRING_LEN(buf), capa - RSTRING_LEN(buf)); + rb_str_set_len(buf, RSTRING_LEN(buf) + size); + if (size == 0) { break; } + size = rb_str_capacity(buf); + if (size > RSTRING_LEN(buf)) { break; } + if (size > EXT_READ_DOUBLE_GROWUP_LIMIT_SIZE) { + capa += EXT_READ_DOUBLE_GROWUP_LIMIT_SIZE; + } else { + capa *= 2; + } + } } - rb_check_type(dest, RUBY_T_STRING); - rb_str_modify(dest); - rb_str_set_len(dest, 0); - rb_str_modify_expand(dest, NUM2SIZET(maxdest)); - rb_obj_infect(self, src); - rb_obj_infect(dest, self); - char *r = RSTRING_PTR(dest); - const char *rr = r + rb_str_capacity(dest); + rb_obj_infect(buf, self); - ZSTD_inBuffer input = { q, qq - q, q1 - q }; - ZSTD_outBuffer output = { r, rr - r, 0 }; - size_t s = ZSTD_decompressStream(p, &output, &input); - extzstd_check_error(s); - rb_str_set_len(dest, output.pos); - return SIZET2NUM(input.pos); + if (RSTRING_LEN(buf) == 0) { + return Qnil; + } else { + return buf; + } } static VALUE +dec_eof(VALUE self) +{ + return (decoder_context(self)->reached_eof == 0 ? Qfalse : Qtrue); +} + +static VALUE dec_reset(VALUE self) { /* * ZSTDLIB_API size_t ZSTD_resetDStream(ZSTD_DStream* zds); */ - size_t s = ZSTD_resetDStream(getdecoder(self)); + size_t s = ZSTD_resetDStream(decoder_context(self)->context); extzstd_check_error(s); return self; } static VALUE @@ -315,24 +569,26 @@ { /* * ZSTDLIB_API size_t ZSTD_sizeof_DStream(const ZSTD_DStream* zds); */ - size_t s = ZSTD_sizeof_DStream(getdecoder(self)); + size_t s = ZSTD_sizeof_DStream(decoder_context(self)->context); extzstd_check_error(s); return SIZET2NUM(s); } static void init_decoder(void) { - cStreamDecoder = rb_define_class_under(extzstd_mZstd, "StreamDecoder", rb_cObject); + cStreamDecoder = rb_define_class_under(extzstd_mZstd, "Decoder", rb_cObject); rb_define_alloc_func(cStreamDecoder, dec_alloc); rb_define_const(cStreamDecoder, "INSIZE", SIZET2NUM(ZSTD_DStreamInSize())); rb_define_const(cStreamDecoder, "OUTSIZE", SIZET2NUM(ZSTD_DStreamOutSize())); - rb_define_method(cStreamDecoder, "initialize", dec_init, 1); - rb_define_method(cStreamDecoder, "update", dec_update, 4); + rb_define_method(cStreamDecoder, "initialize", dec_init, -1); + rb_define_method(cStreamDecoder, "read", dec_read, -1); + rb_define_method(cStreamDecoder, "eof", dec_eof, 0); + rb_define_alias(cStreamDecoder, "eof?", "eof"); rb_define_method(cStreamDecoder, "reset", dec_reset, 0); rb_define_method(cStreamDecoder, "sizeof", dec_sizeof, 0); } /* @@ -340,8 +596,11 @@ */ void extzstd_init_stream(void) { + id_op_lsh = rb_intern("<<"); + id_read = rb_intern("read"); + init_encoder(); init_decoder(); }