Sha256: 86274ee963d11f22f6f1bd2edbd4f18a11ee6d98a4dbabcfc44f6fd079c587f8

Contents?: true

Size: 1.84 KB

Versions: 9

Compression:

Stored size: 1.84 KB

Contents

require 'concurrent/logging'

module Concurrent

  # Ensures passed jobs in a serialized order never running at the same time.
  class SerializedExecution
    include Logging

    Job = Struct.new(:executor, :args, :block) do
      def call
        block.call *args
      end
    end

    def initialize
      @being_executed = false
      @stash          = []
      @mutex          = Mutex.new
    end

    # Submit a task to the executor for asynchronous processing.
    #
    # @param [Executor] executor to be used for this job
    #
    # @param [Array] args zero or more arguments to be passed to the task
    #
    # @yield the asynchronous task to perform
    #
    # @return [Boolean] `true` if the task is queued, `false` if the executor
    #   is not running
    #
    # @raise [ArgumentError] if no task is given
    def post(executor, *args, &task)
      return nil if task.nil?

      job = Job.new executor, args, task

      begin
        @mutex.lock
        post = if @being_executed
                 @stash << job
                 false
               else
                 @being_executed = true
               end
      ensure
        @mutex.unlock
      end

      call_job job if post
      true
    end

    private

    def call_job(job)
      did_it_run = begin
        job.executor.post { work(job) }
        true
      rescue RejectedExecutionError => ex
        false
      end

      # TODO not the best idea to run it myself
      unless did_it_run
        begin
          work job
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
      end
    end

    # ensures next job is executed if any is stashed
    def work(job)
      job.call
    ensure
      begin
        @mutex.lock
        job = @stash.shift || (@being_executed = false)
      ensure
        @mutex.unlock
      end

      call_job job if job
    end

  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
concurrent-ruby-0.7.0.rc0 lib/concurrent/executor/serialized_execution.rb
concurrent-ruby-0.7.0.rc0-x86_64-linux lib/concurrent/executor/serialized_execution.rb
concurrent-ruby-0.7.0.rc0-x86_64-darwin-13 lib/concurrent/executor/serialized_execution.rb
concurrent-ruby-0.7.0.rc0-x86-solaris-2.11 lib/concurrent/executor/serialized_execution.rb
concurrent-ruby-0.7.0.rc0-x86-mingw32 lib/concurrent/executor/serialized_execution.rb
concurrent-ruby-0.7.0.rc0-x86-linux lib/concurrent/executor/serialized_execution.rb
concurrent-ruby-0.7.0.rc0-x64-mingw32 lib/concurrent/executor/serialized_execution.rb
concurrent-ruby-0.7.0.rc0-java lib/concurrent/executor/serialized_execution.rb
concurrent-ruby-0.6.1 lib/concurrent/executor/serialized_execution.rb