Sha256: 3491f1323403a63b5769c11b1609f8d43707b92083d784b2f6a30593bd892ed5

Contents?: true

Size: 1.08 KB

Versions: 9

Compression:

Stored size: 1.08 KB

Contents

module GetaroundUtils; end
module GetaroundUtils::Utils; end

class GetaroundUtils::Utils::AsyncQueue
  include GetaroundUtils::Mixins::Loggable

  MAX_QUEUE_SIZE = 1000
  BUFFER_SIZE = 50

  def initialize
    @queue = []
    @mutex = Mutex.new
    @closed = false
    @worker = Thread.new(&method(:thread_run))
    at_exit { terminate }
  end

  def perform
    raise NotImplementedError
  end

  def push(payload)
    @mutex.synchronize do
      if @queue.size >= MAX_QUEUE_SIZE
        loggable_log(:error, 'queue overflow')
      else
        @queue.push(payload)
      end
    end
  end

  def thread_run
    loop do
      buffer = @mutex.synchronize { @queue.shift(BUFFER_SIZE) }
      loggable_log(:debug, 'thread_run', buffer_size: buffer.size)
      return if @closed && buffer.empty?

      perform(buffer) unless buffer.empty?
      sleep(1) unless @mutex.synchronize { @queue.any? }
    rescue StandardError => e
      loggable_log(:error, e.message, class: e.class.to_s, backtrace: e.backtrace)
    end
  end

  def terminate
    @mutex.synchronize { @closed = true }
    @worker&.join
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
getaround_utils-0.2.11 lib/getaround_utils/utils/async_queue.rb
getaround_utils-0.2.10 lib/getaround_utils/utils/async_queue.rb
getaround_utils-0.2.9 lib/getaround_utils/utils/async_queue.rb
getaround_utils-0.2.7 lib/getaround_utils/utils/async_queue.rb
getaround_utils-0.2.6 lib/getaround_utils/utils/async_queue.rb
getaround_utils-0.2.5 lib/getaround_utils/utils/async_queue.rb
getaround_utils-0.2.1 lib/getaround_utils/utils/async_queue.rb
getaround_utils-0.2.0 lib/getaround_utils/utils/async_queue.rb
getaround_utils-0.1.20 lib/getaround_utils/utils/async_queue.rb