Sha256: 55749ba40b15e8dc73a78a3e122f605b0c7b7dec0b5584c1ab86f772dc4087bd

Contents?: true

Size: 1.05 KB

Versions: 2

Compression:

Stored size: 1.05 KB

Contents

module GetaroundUtils; end
module GetaroundUtils::Utils; end

class GetaroundUtils::Utils::AsyncQueue
  class << self
    include GetaroundUtils::Mixins::Loggable

    MAX_QUEUE_SIZE = 100
    MUTEX = Mutex.new

    def perform
      raise NotImplementedError
    end

    def perform_async(*args)
      start_once!

      if @queue.size > MAX_QUEUE_SIZE
        loggable('warn', 'Queue is overflowing')
        return
      end

      @queue.push(args)
    end

    def start_once!
      MUTEX.synchronize do
        return unless @parent.nil?

        @parent = Process.pid
        @queue = Queue.new

        @worker = Thread.new do
          while args = @queue.pop
            perform(*args)
          end
        rescue ClosedQueueError
          nil
        rescue StandardError => e
          loggable('error', e.message, class: e.class.to_s, backtrace: e.backtrace)
        end

        at_exit { terminate }
      end
    end

    def terminate
      @queue&.close
      @worker&.join
    end

    def reset
      terminate
      @parent = nil
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
getaround_utils-0.1.15 lib/getaround_utils/utils/async_queue.rb
getaround_utils-0.1.14 lib/getaround_utils/utils/async_queue.rb