Sha256: 420de8384c369d931c7dc95b2b095aee594cf7f1956a19605d5191694d0e478f
Contents?: true
Size: 1.8 KB
Versions: 2
Compression:
Stored size: 1.8 KB
Contents
require 'set' require 'hitimes' require 'lounger' require "ztimer/version" require "ztimer/slot" require "ztimer/sorted_store" require "ztimer/watcher" module Ztimer @concurrency = 20 @watcher = Ztimer::Watcher.new(){|slot| execute(slot) } @metric = Hitimes::Metric.new("Notifier") @workers_lock = Mutex.new @queue = Queue.new @running = 0 @count = 0 class << self attr_reader :concurrency, :running, :count def after(milliseconds, &callback) enqueued_at = @metric.utc_microseconds expires_at = enqueued_at + milliseconds * 1000 slot = Slot.new(enqueued_at, expires_at, &callback) add(slot) return slot end def jobs_count return @watcher.jobs end def concurrency=(new_value) raise ArgumentError.new("Invalid concurrency value: #{new_value}") unless new_value.is_a?(Fixnum) && new_value > 1 @concurrency = new_value end protected def add(slot) @count += 1 @watcher << slot end def execute(slot) @queue << slot @workers_lock.synchronize do [@concurrency - @running, @queue.size].min.times do @running += 1 worker = Thread.new do begin while !@queue.empty? && @queue.pop(true) do slot.executed_at = @metric.utc_microseconds slot.callback.call(slot) unless slot.callback.nil? end rescue ThreadError # queue is empty puts "queue is empty" rescue => e STDERR.puts e.inspect + (e.backtrace ? "\n" + e.backtrace.join("\n") : "") end @workers_lock.synchronize { @running -= 1 } end worker.abort_on_exception = true end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
ztimer-0.3.2 | lib/ztimer.rb |
ztimer-0.3.1 | lib/ztimer.rb |