Sha256: 798662656bbfd60f58856e79499bb3f84fdb2f57f50b90d3d954f1480b53214f

Contents?: true

Size: 1.43 KB

Versions: 1

Compression:

Stored size: 1.43 KB

Contents

module Concurrent

  # Ensures that jobs are passed to the underlying executor one by one,
  # never running at the same time.
  class OneByOne

    Job = Struct.new(:executor, :args, :block) do
      def call
        block.call *args
      end
    end

    def initialize
      @being_executed = false
      @stash          = []
      @mutex          = Mutex.new
    end

    # Submit a task to the executor for asynchronous processing.
    #
    # @param [Executor] executor to be used for this job
    #
    # @param [Array] args zero or more arguments to be passed to the task
    #
    # @yield the asynchronous task to perform
    #
    # @return [Boolean] `true` if the task is queued, `false` if the executor
    #   is not running
    #
    # @raise [ArgumentError] if no task is given
    def post(executor, *args, &task)
      return nil if task.nil?
      job = Job.new executor, args, task

      @mutex.lock
      post = if @being_executed
               @stash << job
               false
             else
               @being_executed = true
             end
      @mutex.unlock

      call_job job if post
      true
    end

    private

    def call_job(job)
      job.executor.post { work(job) }
    end

    # ensures next job is executed if any is stashed
    def work(job)
      job.call
    ensure
      @mutex.lock
      job = @stash.shift || (@being_executed = false)
      @mutex.unlock
      call_job job if job
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
concurrent-ruby-0.6.0.pre.2 lib/concurrent/executor/one_by_one.rb