Sha256: c2af803e95fae5120bbaabe6723dbb496e625689010a4ab8b0957632744f5a4a

Contents?: true

Size: 1.99 KB

Versions: 6

Compression:

Stored size: 1.99 KB

Contents

require 'thread'

module RestFtpDaemon
  class JobQueue < Queue

    def initialize
      @queued = []
      @popped = []

      @waiting = []
      @queued.taint          # enable tainted communication
      @waiting.taint
      self.taint
      @mutex = Mutex.new
    end

    def queued
      @queued
    end
    def queued_size
      @queued.length
    end

    def popped
      @popped
    end
    def popped_size
      @popped.length
    end

    def all
      @queued + @popped
    end
    def all_size
      popped_size + queued_size
    end

    def push(obj)
      # Check that itme responds to "priorty" method
      raise "object should respond to priority method" unless obj.respond_to? :priority

      @mutex.synchronize{
        @queued.push obj
        begin
          t = @waiting.shift
          t.wakeup if t
        rescue ThreadError
          retry
        end
      }
    end
    alias << push
    alias enq push

    #
    # Retrieves data from the queue.  If the queue is empty, the calling thread is
    # suspended until data is pushed onto the queue.  If +non_block+ is true, the
    # thread isn't suspended, and an exception is raised.
    #
    def pop(non_block=false)
      @mutex.synchronize{
        while true
          if @queued.empty?
            raise ThreadError, "queue empty" if non_block
            @waiting.push Thread.current
            @mutex.sleep
          else
            return pick
          end
        end
      }
    end
    alias shift pop
    alias deq pop

    def empty?
      @queued.empty?
    end

    def clear
      @queued.clear
    end

    def num_waiting
      @waiting.size
    end

  protected

    def pick
      # Sort jobs by priority and get the biggest one
      picked = @queued.sort { |a,b| a.priority.to_i <=> b.priority.to_i }.last

      # Delete it from the queue
      @queued.delete_if { |item| item == picked } unless picked.nil?

      # Stack it to popped items
      @popped.push picked

      # Return picked
      picked
    end

  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
rest-ftp-daemon-0.72b lib/rest-ftp-daemon/job_queue.rb
rest-ftp-daemon-0.70 lib/rest-ftp-daemon/job_queue.rb
rest-ftp-daemon-0.60 lib/rest-ftp-daemon/job_queue.rb
rest-ftp-daemon-0.6 lib/rest-ftp-daemon/job_queue.rb
rest-ftp-daemon-0.55 lib/rest-ftp-daemon/job_queue.rb
rest-ftp-daemon-0.41 lib/rest-ftp-daemon/job_queue.rb