Sha256: 371f99bd12786df0ef76900f481606514183d52a5ccfd654a0563e5a2e65d05e

Contents?: true

Size: 1.95 KB

Versions: 20

Compression:

Stored size: 1.95 KB

Contents

package org.embulk.exec;

import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.util.ResourceLeakDetector;
import org.embulk.spi.Buffer;
import org.embulk.spi.BufferAllocator;

public class PooledBufferAllocator
        implements BufferAllocator
{
    private static final int DEFAULT_BUFFER_SIZE = 32*1024;
    private static final int MINIMUM_BUFFER_SIZE = 8*1024;

    private final PooledByteBufAllocator nettyBuffer;

    public PooledBufferAllocator()
    {
        // TODO configure parameters
        this.nettyBuffer = new PooledByteBufAllocator(false);
    }

    public Buffer allocate()
    {
        return allocate(DEFAULT_BUFFER_SIZE);
    }

    public Buffer allocate(int minimumCapacity)
    {
        int size = MINIMUM_BUFFER_SIZE;
        while (size < minimumCapacity) {
            size *= 2;
        }
        return new NettyByteBufBuffer(nettyBuffer.buffer(size));
    }

    private static class NettyByteBufBuffer
            extends Buffer
    {
        private ByteBuf buf;
        private BufferReleasedBeforeAt doubleFreeCheck;

        public NettyByteBufBuffer(ByteBuf buf)
        {
            super(buf.array(), buf.arrayOffset(), buf.capacity());
            this.buf = buf;
        }

        public void release()
        {
            if (doubleFreeCheck != null) {
                new BufferDoubleReleasedException(doubleFreeCheck).printStackTrace();
            }
            if (buf != null) {
                buf.release();
                buf = null;
                doubleFreeCheck = new BufferReleasedBeforeAt();
            }
        }
    }

    static class BufferReleasedBeforeAt
            extends Throwable
    { }

    static class BufferDoubleReleasedException
            extends IllegalStateException
    {
        public BufferDoubleReleasedException(BufferReleasedBeforeAt releasedAt)
        {
            super("Detected double release() call of a buffer", releasedAt);
        }
    }
}

Version data entries

20 entries across 20 versions & 1 rubygems

Version Path
embulk-0.6.19 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.18 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.17 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.16 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.15 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.14 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.13 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.12 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.11 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.10 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.9 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.8 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.7 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.6 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.5 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.4 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.3 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.2 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.1 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java
embulk-0.6.0 embulk-core/src/main/java/org/embulk/exec/PooledBufferAllocator.java