Sha256: 663c0f43dec99b0cbc89bd7eff76c68dd653fa58096265abebbfb7c5f396b3ea

Contents?: true

Size: 1.56 KB

Versions: 1

Compression:

Stored size: 1.56 KB

Contents

require 'eventmachine'
require 'swift'

module Swift
  # Eventmachine Adapter Extensions.
  #
  # This replaces the Adapter#execute method with a non-blocking asynchronous version.
  class Adapter
    alias :blocking_execute :execute

    class EMHandler < EM::Connection
      def initialize adapter, record, defer
        @started = Time.now
        @adapter = adapter
        @record  = record
        @defer   = defer
      end

      def notify_readable
        detach
        start, command, bind = @adapter.pending.shift
        @adapter.log_command(start, command, bind) if @adapter.trace?

        begin
          @defer.succeed(@record ? Result.new(@record, @adapter.result) : @adapter.result)
        rescue Exception => e
          @defer.fail(e)
        end
      end
    end

    # Execute a command asynchronously.
    #
    # @example
    #   defer = Swift.db.execute(User, "select * from users where id = ?", 1)
    #   defer.callback do |user|
    #     p user.id
    #   end
    #   defer.errback do |error|
    #     p error
    #   end
    #
    # @see  [Swift::Adapter]
    def execute command, *bind
      raise RuntimeError, 'Command already in progress' unless pending.empty?

      start = Time.now
      record, command = command, bind.shift if command.kind_of?(Class) && command < Record
      pending << [start, command, bind]
      query(command, *bind)

      EM::DefaultDeferrable.new.tap do |defer|
        EM.watch(fileno, EMHandler, self, record, defer) {|c| c.notify_readable = true }
      end
    end

    def pending
      @pending ||= []
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
swift-1.1.0 lib/swift/eventmachine.rb