Sha256: e7cacf1015723c8bfbc1042450c1ef0da464da1212ebbadcf4fe509a8f418061

Contents?: true

Size: 1.93 KB

Versions: 1

Compression:

Stored size: 1.93 KB

Contents

require 'weakref'
require 'thread'

require 'girl_friday/version'
require 'girl_friday/work_queue'
require 'girl_friday/error_handler'
require 'girl_friday/persistence'
require 'girl_friday/batch'

begin
  # Rubinius or JRuby
  require 'rubinius/actor'
  require 'girl_friday/monkey_patches'
  GirlFriday::WorkQueue::Actor = Rubinius::Actor
rescue LoadError
  # Others
  require 'girl_friday/actor'
end

module GirlFriday

  @lock = Mutex.new

  def self.add_queue(ref)
    @lock.synchronize do
      @queues ||= []
      @queues.reject! { |q| !q.weakref_alive? }
      @queues << ref
    end
  end

  def self.remove_queue(ref)
    @lock.synchronize do
      @queues.delete ref
    end
  end

  def self.queues
    @queues || []
  end

  def self.status
    queues.inject({}) do |memo, queue|
      begin
        memo = memo.merge(queue.__getobj__.status)
      rescue WeakRef::RefError
      end
      memo
    end
  end

  ##
  # Notify girl_friday to shutdown ASAP.  Workers will not pick up any
  # new work; any new work pushed onto the queues will be pushed onto the
  # backlog (and persisted).  This method will block until all queues are
  # quiet or the timeout has passed.
  #
  # Note that shutdown! just works with existing queues.  If you create a
  # new queue, it will act as normal.
  def self.shutdown!(timeout=30)
    qs = queues.select { |q| q.weakref_alive? }
    count = qs.size

    if count > 0
      m = Mutex.new
      var = ConditionVariable.new

      qs.each do |q|
        next if !q.weakref_alive?
        begin
          q.__getobj__.shutdown do |queue|
            m.synchronize do
              count -= 1
              var.signal if count == 0
            end
          end
        rescue WeakRef::RefError
          m.synchronize do
            count -= 1
            var.signal if count == 0
          end
        end
      end

      m.synchronize do
        var.wait(m, timeout) if count != 0
      end
    end
    count
  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
girl_friday-0.11.0 lib/girl_friday.rb