lib/rumai/ixp/transport.rb in rumai-3.1.0 vs lib/rumai/ixp/transport.rb in rumai-3.2.0

- old
+ new

@@ -3,34 +3,33 @@ # Copyright protects this work. # See LICENSE file for details. #++ require 'rumai/ixp/message' -require 'thread' # for Mutex +require 'thread' # for Mutex and Queue module Rumai module IXP ## - # A thread-safe proxy that multiplexes many + # A thread-safe channel that multiplexes many # threads onto a single 9P2000 connection. # + # The send/recv implementation is based on the XCB cookie approach: + # http://www.x.org/releases/X11R7.5/doc/libxcb/tutorial/#requestsreplies + # class Agent attr_reader :msize + ## + # [stream] + # I/O stream on which a 9P2000 server is listening. + # def initialize stream - @stream = stream - @send_lock = Mutex.new - @recv_bays = Hash.new {|h,k| h[k] = Queue.new } # tag => Queue(message) + @stream = stream - # background thread which continuously receives - # and dispatches messages from the 9P2000 server - Thread.new do - while true - msg = Fcall.from_9p @stream - @recv_bays[msg.tag] << msg - end - end.priority = -1 + @recv_buf = {} # tag => message + @recv_lock = Mutex.new @tag_pool = RangedPool.new(0...BYTE2_MASK) @fid_pool = RangedPool.new(0...BYTE4_MASK) # establish connection with 9P2000 server @@ -69,11 +68,13 @@ @lim = @lim.succ unless range.exclude_end? @pool = Queue.new end + ## # Returns an unoccupied range member from the pool. + # def obtain begin @pool.deq true rescue ThreadError @@ -92,42 +93,83 @@ retry end end + ## # Marks the given member as being unoccupied so # that it may be occupied again in the future. + # def release member @pool << member end end ## - # Sends the given message (Rumai::IXP::Fcall) and returns its response. + # Sends the given request (Rumai::IXP::Fcall) and returns + # a ticket that you can use later to receive the reply. # - # This method allows you to perform a 9P2000 transaction without - # worrying about the details of tag collisions and thread safety. - # - def talk request - # send the request + def send request tag = @tag_pool.obtain - bay = @recv_bays[tag] request.tag = tag - output = request.to_9p - @send_lock.synchronize do - @stream << output + @stream << request.to_9p + + tag + end + + ## + # Returns the reply for the given ticket, which was previously given + # to you when you sent the corresponding request (Rumai::IXP::Fcall). + # + def recv tag + loop do + reply = @recv_lock.synchronize do + if @recv_buf.key? tag + @recv_buf.delete tag + else + # reply was not in the receive buffer, so wait + # for the next reply... hoping that it is ours + msg = Fcall.from_9p(@stream) + + if msg.tag == tag + msg + else + # we got someone else's reply, so buffer + # it (for them to receive) and try again + @recv_buf[msg.tag] = msg + nil + end + end + end + + if reply + @tag_pool.release tag + + if reply.is_a? Rerror + raise Error, reply.ename + end + + return reply + else + # give other threads a chance to receive + Thread.pass + end end + end - # receive the response - response = bay.shift - @tag_pool.release tag + ## + # Sends the given request (Rumai::IXP::Fcall) and returns its reply. + # + def talk request + tag = send(request) - if response.is_a? Rerror - raise Error, "#{response.ename.inspect} in response to #{request.inspect}" - else - return response + begin + recv tag + rescue Error => e + e.message << " -- in reply to #{request.inspect}" + raise end end MODES = { 'r' => Topen::OREAD, @@ -180,11 +222,11 @@ end ## # Encapsulates I/O access over a file handle (fid). # - # NOTE: this class is NOT thread-safe. + # NOTE: this class is NOT thread safe! # class FidStream attr_reader :fid, :stat attr_reader :eof @@ -274,12 +316,12 @@ ## # Writes the given content at the current position in this stream. # def write content - raise 'closed streams cannot be written to' if @closed - raise 'directories cannot be written to' if @stat.directory? + raise 'cannot write to a closed stream' if @closed + raise 'cannot write to a directory' if @stat.directory? data = content.to_s limit = data.length + @pos while @pos < limit @@ -308,21 +350,25 @@ f.read(*args) end end ## - # Returns the names of all files inside the directory whose path is given. + # Returns the basenames of all files + # inside the directory at the given path. # + # See Dir::entries in the Ruby documentation. + # def entries path unless stat(path).directory? raise ArgumentError, "#{path.inspect} is not a directory" end read(path).map! {|t| t.name} end ## - # Returns the content of the file/directory at the given path. + # Writes the given content to + # the file at the given path. # def write path, content open path, 'w' do |f| f << content end