Sha256: 33b8a727c2ef631a432b2a72b6467b43f9f192ef58ce81840009450edd57be4f

Contents?: true

Size: 1.7 KB

Versions: 1

Compression:

Stored size: 1.7 KB

Contents

module Cute
   module Synchronization
     class Semaphore
      def initialize(max)
        @lock = Mutex.new
        @cond = ConditionVariable.new
        @used = 0
        @max = max
      end

      def acquire(n = 1)
        @lock.synchronize {
          while (n > (@max - @used)) do
            @cond.wait(@lock)
          end
          @used += n
        }
      end

      def relaxed_acquire(n = 1)
        taken = 0
        @lock.synchronize {
          while (@max == @used) do
            @cond.wait(@lock)
          end
          taken = (n + @used) > @max ? @max - @used : n
          @used += taken
        }
        return n - taken
      end

      def release(n = 1)
        @lock.synchronize {
          @used -= n
          @cond.broadcast
        }
      end
    end

    class SlidingWindow
      def initialize(size)
        @queue = []
        @lock = Mutex.new
        @finished = false
        @size = size
      end

      def add(t)
        @queue << t
      end

      def run
        tids = []
        (1..@size).each {
          tids << Thread.new {
            while !@finished do
              task = nil
              @lock.synchronize {
                if @queue.size > 0
                  task = @queue.pop
                else
                  @finished = true
                end
              }
              if task
                if task.is_a?(Proc)
                  task.call
                else
                  system(task)
                end
              end
            end
          }
        }
        tids.each { |tid| tid.join }
      end
    end
  end
end

if (__FILE__ == $0)
  w = Cute::Synchronization::SlidingWindow.new(3)
  (1..10).each {
    w.add("sleep 1")
  }
  w.run
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
ruby-cute-0.24 lib/cute/synchronization.rb