Sha256: e115c589b408d34b41c978f620703c5d099ddad501d43632a5119ea90055ec25
Contents?: true
Size: 859 Bytes
Versions: 1
Compression:
Stored size: 859 Bytes
Contents
# frozen_string_literal: true module ActiveConcurrency module Schedulers class Topic def initialize(pool, **options) topics = options[:topics] mutexes = topics.each_with_object({}) do |t, h| h[t] = Mutex.new end topics = topics.cycle @pool = pool.each_with_object({}) do |w, h| topic = topics.next w.mutex = mutexes[topic] h.key?(topic) ? (h[topic] << w) : (h[topic] = [w]) end end def schedule(*args, &block) topic = args.pop worker = @pool[topic].min_by(&:size) worker.schedule(*args, &block) end private def topics_pool pool.each_with_object({}) do |w, h| topic = topics.next h.key?(topic) ? (h[topic] << w) : (h[topic] = [w]) end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
active_concurrency-0.1.0 | lib/active_concurrency/schedulers/topic.rb |