Sha256: e115c589b408d34b41c978f620703c5d099ddad501d43632a5119ea90055ec25

Contents?: true

Size: 859 Bytes

Versions: 1

Compression:

Stored size: 859 Bytes

Contents

# frozen_string_literal: true

module ActiveConcurrency
  module Schedulers
    class Topic

      def initialize(pool, **options)
        topics = options[:topics]
        mutexes = topics.each_with_object({}) do |t, h|
          h[t] = Mutex.new
        end

        topics = topics.cycle
        @pool = pool.each_with_object({}) do |w, h|
          topic = topics.next
          w.mutex = mutexes[topic]
          h.key?(topic) ? (h[topic] << w) : (h[topic] = [w])
        end
      end

      def schedule(*args, &block)
        topic = args.pop
        worker = @pool[topic].min_by(&:size)
        worker.schedule(*args, &block)
      end

      private

      def topics_pool
        pool.each_with_object({}) do |w, h|
          topic = topics.next
          h.key?(topic) ? (h[topic] << w) : (h[topic] = [w])
        end
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
active_concurrency-0.1.0 lib/active_concurrency/schedulers/topic.rb