Sha256: a63a02ce3b6960803ae77633938a3aae40f9589340d14c258ed4da896496a937

Contents?: true

Size: 1.96 KB

Versions: 22

Compression:

Stored size: 1.96 KB

Contents

require 'acfs/service/middleware'

module Acfs
  # @api private
  #
  class Runner
    include Service::Middleware
    attr_reader :adapter

    def initialize(adapter)
      @adapter = adapter
      @running = false
    end

    # Process an operation. Synchronous operations will be run
    # and parallel operations will be queued.
    #
    def process(op)
      ::ActiveSupport::Notifications.instrument 'acfs.operation.before_process', operation: op
      op.synchronous? ? run(op) : enqueue(op)
    end

    # Run operation right now skipping queue.
    #
    def run(op)
      ::ActiveSupport::Notifications.instrument 'acfs.runner.sync_run', operation: op do
        op_request(op) {|req| adapter.run req }
      end
    end

    # List of current queued operations.
    #
    def queue
      @queue ||= []
    end

    # Enqueue operation to be run later.
    #
    def enqueue(op)
      ::ActiveSupport::Notifications.instrument 'acfs.runner.enqueue', operation: op do
        if running?
          op_request(op) {|req| adapter.queue req }
        else
          queue << op
        end
      end
    end

    # Return true if queued operations are currently processed.
    #
    def running?
      @running
    end

    # Start processing queued operations.
    #
    def start
      return if running?

      enqueue_operations
      start_all
    rescue
      queue.clear
      raise
    end

    def clear
      queue.clear
      adapter.abort
      @running = false
    end

    private

    def start_all
      @running = true
      adapter.start
    ensure
      @running = false
    end

    def enqueue_operations
      while (op = queue.shift)
        op_request(op) {|req| adapter.queue req }
      end
    end

    def op_request(op)
      return if Acfs::Stub.enabled? && Acfs::Stub.stubbed(op)
      req = op.service.prepare op.request
      return unless req.is_a? Acfs::Request
      req = prepare req
      return unless req.is_a? Acfs::Request
      yield req
    end
  end
end

Version data entries

22 entries across 22 versions & 1 rubygems

Version Path
acfs-1.3.2 lib/acfs/runner.rb
acfs-1.3.1 lib/acfs/runner.rb
acfs-1.3.0 lib/acfs/runner.rb
acfs-1.2.1 lib/acfs/runner.rb
acfs-0.50.0 lib/acfs/runner.rb
acfs-1.2.0 lib/acfs/runner.rb
acfs-0.49.1 lib/acfs/runner.rb
acfs-1.1.1 lib/acfs/runner.rb
acfs-0.49.0 lib/acfs/runner.rb
acfs-1.1.0 lib/acfs/runner.rb
acfs-0.48.2 lib/acfs/runner.rb
acfs-1.0.1 lib/acfs/runner.rb
acfs-0.48.1 lib/acfs/runner.rb
acfs-1.0.0 lib/acfs/runner.rb
acfs-0.48.0 lib/acfs/runner.rb
acfs-0.47.0 lib/acfs/runner.rb
acfs-0.46.0 lib/acfs/runner.rb
acfs-0.45.0 lib/acfs/runner.rb
acfs-0.44.0 lib/acfs/runner.rb
acfs-0.43.2 lib/acfs/runner.rb