lib/rumai/ixp/transport.rb in rumai-3.1.0 vs lib/rumai/ixp/transport.rb in rumai-3.2.0
- old
+ new
@@ -3,34 +3,33 @@
# Copyright protects this work.
# See LICENSE file for details.
#++
require 'rumai/ixp/message'
-require 'thread' # for Mutex
+require 'thread' # for Mutex and Queue
module Rumai
module IXP
##
- # A thread-safe proxy that multiplexes many
+ # A thread-safe channel that multiplexes many
# threads onto a single 9P2000 connection.
#
+ # The send/recv implementation is based on the XCB cookie approach:
+ # http://www.x.org/releases/X11R7.5/doc/libxcb/tutorial/#requestsreplies
+ #
class Agent
attr_reader :msize
+ ##
+ # [stream]
+ # I/O stream on which a 9P2000 server is listening.
+ #
def initialize stream
- @stream = stream
- @send_lock = Mutex.new
- @recv_bays = Hash.new {|h,k| h[k] = Queue.new } # tag => Queue(message)
+ @stream = stream
- # background thread which continuously receives
- # and dispatches messages from the 9P2000 server
- Thread.new do
- while true
- msg = Fcall.from_9p @stream
- @recv_bays[msg.tag] << msg
- end
- end.priority = -1
+ @recv_buf = {} # tag => message
+ @recv_lock = Mutex.new
@tag_pool = RangedPool.new(0...BYTE2_MASK)
@fid_pool = RangedPool.new(0...BYTE4_MASK)
# establish connection with 9P2000 server
@@ -69,11 +68,13 @@
@lim = @lim.succ unless range.exclude_end?
@pool = Queue.new
end
+ ##
# Returns an unoccupied range member from the pool.
+ #
def obtain
begin
@pool.deq true
rescue ThreadError
@@ -92,42 +93,83 @@
retry
end
end
+ ##
# Marks the given member as being unoccupied so
# that it may be occupied again in the future.
+ #
def release member
@pool << member
end
end
##
- # Sends the given message (Rumai::IXP::Fcall) and returns its response.
+ # Sends the given request (Rumai::IXP::Fcall) and returns
+ # a ticket that you can use later to receive the reply.
#
- # This method allows you to perform a 9P2000 transaction without
- # worrying about the details of tag collisions and thread safety.
- #
- def talk request
- # send the request
+ def send request
tag = @tag_pool.obtain
- bay = @recv_bays[tag]
request.tag = tag
- output = request.to_9p
- @send_lock.synchronize do
- @stream << output
+ @stream << request.to_9p
+
+ tag
+ end
+
+ ##
+ # Returns the reply for the given ticket, which was previously given
+ # to you when you sent the corresponding request (Rumai::IXP::Fcall).
+ #
+ def recv tag
+ loop do
+ reply = @recv_lock.synchronize do
+ if @recv_buf.key? tag
+ @recv_buf.delete tag
+ else
+ # reply was not in the receive buffer, so wait
+ # for the next reply... hoping that it is ours
+ msg = Fcall.from_9p(@stream)
+
+ if msg.tag == tag
+ msg
+ else
+ # we got someone else's reply, so buffer
+ # it (for them to receive) and try again
+ @recv_buf[msg.tag] = msg
+ nil
+ end
+ end
+ end
+
+ if reply
+ @tag_pool.release tag
+
+ if reply.is_a? Rerror
+ raise Error, reply.ename
+ end
+
+ return reply
+ else
+ # give other threads a chance to receive
+ Thread.pass
+ end
end
+ end
- # receive the response
- response = bay.shift
- @tag_pool.release tag
+ ##
+ # Sends the given request (Rumai::IXP::Fcall) and returns its reply.
+ #
+ def talk request
+ tag = send(request)
- if response.is_a? Rerror
- raise Error, "#{response.ename.inspect} in response to #{request.inspect}"
- else
- return response
+ begin
+ recv tag
+ rescue Error => e
+ e.message << " -- in reply to #{request.inspect}"
+ raise
end
end
MODES = {
'r' => Topen::OREAD,
@@ -180,11 +222,11 @@
end
##
# Encapsulates I/O access over a file handle (fid).
#
- # NOTE: this class is NOT thread-safe.
+ # NOTE: this class is NOT thread safe!
#
class FidStream
attr_reader :fid, :stat
attr_reader :eof
@@ -274,12 +316,12 @@
##
# Writes the given content at the current position in this stream.
#
def write content
- raise 'closed streams cannot be written to' if @closed
- raise 'directories cannot be written to' if @stat.directory?
+ raise 'cannot write to a closed stream' if @closed
+ raise 'cannot write to a directory' if @stat.directory?
data = content.to_s
limit = data.length + @pos
while @pos < limit
@@ -308,21 +350,25 @@
f.read(*args)
end
end
##
- # Returns the names of all files inside the directory whose path is given.
+ # Returns the basenames of all files
+ # inside the directory at the given path.
#
+ # See Dir::entries in the Ruby documentation.
+ #
def entries path
unless stat(path).directory?
raise ArgumentError, "#{path.inspect} is not a directory"
end
read(path).map! {|t| t.name}
end
##
- # Returns the content of the file/directory at the given path.
+ # Writes the given content to
+ # the file at the given path.
#
def write path, content
open path, 'w' do |f|
f << content
end