Sha256: 3f96271d38799fee1e4f0124786aa115a24d84c321a210ab6f22640b0096dd4f

Contents?: true

Size: 1.81 KB

Versions: 20

Compression:

Stored size: 1.81 KB

Contents

require 'concurrent/actor/utils/balancer'

module Concurrent
  module Actor
    module Utils

      # Allows to create a pool of workers and distribute work between them
      # @param [Integer] size number of workers
      # @yield [balancer, index] a block spawning an worker instance. called +size+ times.
      #   The worker should be descendant of AbstractWorker and supervised, see example.
      # @yieldparam [Balancer] balancer to pass to the worker
      # @yieldparam [Integer] index of the worker, usually used in its name
      # @yieldreturn [Reference] the reference of newly created worker
      # @example
      #     class Worker < Concurrent::Actor::Utils::AbstractWorker
      #       def work(message)
      #         p message * 5
      #       end
      #     end
      #
      #     pool = Concurrent::Actor::Utils::Pool.spawn! 'pool', 5 do |balancer, index|
      #       Worker.spawn name: "worker-#{index}", supervise: true, args: [balancer]
      #     end
      #
      #     pool << 'asd' << 2
      #     # prints:
      #     # "asdasdasdasdasd"
      #     # 10
      class Pool < RestartingContext
        def initialize(size, &worker_initializer)
          @balancer = Balancer.spawn name: :balancer, supervise: true
          @workers  = Array.new(size, &worker_initializer.curry[@balancer])
          @workers.each { |w| Type! w, Reference }
        end

        def on_message(message)
          redirect @balancer
        end
      end

      class AbstractWorker < RestartingContext
        def initialize(balancer)
          @balancer = balancer
          @balancer << :subscribe
        end

        def on_message(message)
          work message
        ensure
          @balancer << :subscribe
        end

        def work(message)
          raise NotImplementedError
        end
      end
    end
  end
end

Version data entries

20 entries across 20 versions & 1 rubygems

Version Path
concurrent-ruby-0.8.0 lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.8.0-java lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.2 lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.2-x86_64-linux lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.2-x86-solaris-2.11 lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.2-x86-mingw32 lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.2-x86-linux lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.2-x64-mingw32 lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.2-java lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.8.0.pre2 lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.8.0.pre2-java lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.8.0.pre1 lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.8.0.pre1-java lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.1 lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.1-x86_64-linux lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.1-x86-solaris-2.11 lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.1-x86-mingw32 lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.1-x86-linux lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.1-x64-mingw32 lib/concurrent/actor/utils/pool.rb
concurrent-ruby-0.7.1-java lib/concurrent/actor/utils/pool.rb