Sha256: bd8444d695b60da85fc06d32c020770a784cb8d594664c8303b4ce14481c555f

Contents?: true

Size: 1.51 KB

Versions: 2

Compression:

Stored size: 1.51 KB

Contents

module XPFlow

    class Semaphore

        def initialize(n)
            @n = n
            @mutex = Mutex.new
            @cond = ConditionVariable.new
        end

        def acquire
            @mutex.synchronize do
                while @n == 0
                    @cond.wait(@mutex)
                end
                @n -= 1
            end
        end

        def release
            @mutex.synchronize do
                @n += 1
                @cond.signal
            end
        end

        def synchronize
            begin
                @mutex.acquire
                yield
            ensure
                @mutex.release
            end
        end

    end

    class SyncQueue

        attr_reader :q

        def initialize
            @q = []
            @elements = Semaphore.new(0)
            @mutex = Mutex.new
            @cv = ConditionVariable.new  # empty queue
        end

        def push(x)
            @mutex.synchronize do
                @q.push(x)
            end
            @elements.release
        end

        def pop
            @elements.acquire
            return @mutex.synchronize do
                x = @q.shift
                @cv.broadcast if @q.length == 0
                x = yield(x) if block_given?
                x
            end
        end

        # waits for the queue to be empty

        def wait_empty
            @mutex.synchronize do
                while @q.length > 0
                    @cv.wait(@mutex)
                end
            end
        end

    end

end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
xpflow-0.1c lib/xpflow/concurrency.rb
xpflow-0.1b lib/xpflow/concurrency.rb