#-- # ============================================================================= # Copyright (c) 2004,2005 Jamis Buck (jamis@37signals.com) # All rights reserved. # # This source file is distributed as part of the Net::SSH Secure Shell Client # library for Ruby. This file (and the library as a whole) may be used only as # allowed by either the BSD license, or the Ruby license (or, by association # with the Ruby license, the GPL). See the "doc" subdirectory of the Net::SSH # distribution for the texts of these licenses. # ----------------------------------------------------------------------------- # net-ssh website : http://net-ssh.rubyforge.org # project website: http://rubyforge.org/projects/net-ssh # ============================================================================= #++ require 'thread' require 'net/ssh/connection/constants' require 'net/ssh/errors' module Net module SSH module Connection class Driver include Constants # A structure for representing global requests, as registered by the # #global_request method. Request = Struct.new( :type, :data, :callback ) # A structure for representing a data buffer that must be sent # across a channel. DataRequest = Struct.new( :channel, :data, :type ) #-- # ==================================================================== # CONSTRUCTOR # ==================================================================== #++ # Create a new connection driver that communicates over the given # transport session. +log+ is the logger instance to write log messages # to, buffers is a buffer factory, and channels is a factory that can # return new channel instances. def initialize( session, log, buffers, factories ) @session = session @log = log @buffers = buffers @factories = factories @channel_id_mutex = Mutex.new @next_channel_id = 0 @channel_map = Hash.new @request_queue = Array.new @channel_open_handlers = Hash.new @data_requests = Array.new @data_requests_mutex = Mutex.new end #-- # ==================================================================== # CHANNEL MANAGEMENT # ==================================================================== #++ # Open and return a new channel. This returns immediately, before the # server confirms that the channel was opened. When the server sends # the confirmation, the +on_confirm+ callback will be invoked. def open_channel( type, extra_data=nil, &on_confirm ) channel = @factories[:open].call( type, extra_data ) channel.on_confirm_open &on_confirm @channel_map[ channel.local_id ] = channel end # Remove the given channel from the connection. def remove_channel( channel ) @channel_map.delete channel.local_id end # Returns an array of active channels. def channels @channel_map.values end # Add a callback to be invoked when a channel-open request is recieved # for a channel of the given type. The handler-id is returned. def add_channel_open_handler( type, &block ) ( @channel_open_handlers[ type ] ||= Array.new ).push block @channel_open_handlers.length end # Remove a callback with the given id for channel-open requests of the # given type. def remove_channel_open_handler( type, id ) @channel_open_handlers[ type ][ id-1 ] = nil end # Return the next available channel id for this connection. This # method is thread-safe. def allocate_channel_id @channel_id_mutex.synchronize do @next_channel_id += 1 return @next_channel_id end end # Register a data buffer (of an optional type) to be sent across the # given channel at the next available opportunity. # # This is used internally by channels to hide the window size and # maximum packet size from the client. Clients should not call this # method directly. def register_data_request( channel, data, type=nil ) @data_requests_mutex.synchronize do @data_requests << DataRequest.new( channel, data, type ) end # make sure the new data request has a chance to be sent to the # server... Otherwise, it cannot be sent until the next time #process # is invoked, which can be unexpected in synchronous situations. process_data_requests end #-- # ==================================================================== # CONNECTION PROCESSING # ==================================================================== #++ # Repeated call #process for as long as the given block returns # +true+. If no block is given, then the loop continues until there # are no more open channels on this connection. def loop( &block ) block ||= proc do channels = @channel_map.reject {|k,v| v.type == 'auth-agent@openssh.com' } not channels.empty? end process while block.call end # Wait for and dispatch a single event. If +nonblock+ is false (the # default) this will block until a message has been received. Otherwise, # it will return immediately. def process( nonblock=false ) process_data_requests if !nonblock || reader_ready? type, response = @session.wait_for_message unless ( dispatcher = MESSAGES[ type ] ) raise Net::SSH::Exception, "Unexpected response type '#{type}', (#{response.inspect})" end dispatcher[:method].bind( self ).call( response ) end self end #-- # ==================================================================== # COMMUNICATION # ==================================================================== #++ # Send a global request packet to the server. This returns immediately. # The given block will be invoked when the server responds. def global_request( type, data=nil, &block ) writer = @buffers.writer writer.write_byte GLOBAL_REQUEST writer.write_string type.to_s writer.write_bool true writer.write data.to_s if data @session.send_message writer @request_queue.push Request.new( type, data, block ) self end # Send a channel request packet to the server. def channel_request( type ) writer = @buffers.writer writer.write_byte CHANNEL_REQUEST writer.write_long 0 # channel id writer.write_string type writer.write_byte 0 # want_confirm @session.send_message writer end # A convenience method for sending messages. def send_message( msg ) @session.send_message msg self end # Delegates to the #reader_ready method of the transport session. def reader_ready? @session.reader_ready? end # Sends an innocuous packet to the server to test the connection. Can # be used to defeat timeouts on long-running commands. def ping! @session.ping! end #-- # ==================================================================== # MESSAGE HANDLERS # ==================================================================== #++ def do_global_request( response ) name = response.read_string want_reply = response.read_bool request_data = response.remainder_as_buffer @log.debug "GLOBAL_REQUEST received (#{name})" if @log.debug? if want_reply writer = @buffers.writer writer.write_byte REQUEST_SUCCESS @session.send_message writer end end def do_request_success( response ) @log.debug "REQUEST_SUCCESS received" if @log.debug? process_request response, true end def do_request_failure( response ) @log.debug "REQUEST_FAILURE received" if @log.debug? process_request response, false end def do_channel_open( response ) ch_type = response.read_string @log.debug "CHANNEL_OPEN recieved (#{ch_type})" if @log.debug? handled = false sender_channel = response.read_long window_size = response.read_long packet_size = response.read_long channel = @factories[:create].call( ch_type, sender_channel, window_size, packet_size ) ( @channel_open_handlers[ ch_type ] || [] ).each do |handler| next unless handler handled = true handler.call( self, channel, response ) end unless handled raise Net::SSH::Exception, "cannot handle request to open a channel of type '#{ch_type}'" end @channel_map[channel.local_id] = channel writer = @buffers.writer writer.write_byte CHANNEL_OPEN_CONFIRMATION writer.write_long channel.remote_id writer.write_long channel.local_id writer.write_long 0x7FFFFFFF writer.write_long 0x7FFFFFFF @session.send_message writer end def do_channel_open_failure( response ) local_id = response.read_long reason_code = response.read_long reason = response.read_string language = response.read_string @log.debug "CHANNEL_OPEN_FAILURE recieved (#{reason})" if @log.debug? channel = @channel_map[ local_id ] @channel_map.delete local_id channel.do_confirm_failed reason_code, reason, language end def do_channel_open_confirmation( response ) local_id = response.read_long remote_id = response.read_long window_size = response.read_long packet_size = response.read_long if @log.debug? @log.debug "CHANNEL_OPEN_CONFIRMATION recieved (#{local_id})" end channel = @channel_map[ local_id ] channel.do_confirm_open remote_id, window_size, packet_size end def do_channel_window_adjust( response ) local_id = response.read_long bytes_to_add = response.read_long if @log.debug? @log.debug "CHANNEL_WINDOW_ADJUST recieved " + "(#{local_id}:#{bytes_to_add})" end @channel_map[ local_id ].do_window_adjust( bytes_to_add ) end def do_channel_data( response ) local_id = response.read_long data = response.read_string if @log.debug? @log.debug "CHANNEL_DATA recieved (#{local_id}:#{data.inspect})" end @channel_map[ local_id ].do_data data end def do_channel_extended_data( response ) local_id = response.read_long data_type = response.read_long data = response.read_string if @log.debug? @log.debug "CHANNEL_EXTENDED_DATA recieved " + "(#{local_id}:#{data_type}:#{data.inspect})" end @channel_map[ local_id ].do_extended_data data_type, data end def do_channel_eof( response ) local_id = response.read_long @log.debug "CHANNEL_EOF recieved (#{local_id})" if @log.debug? @channel_map[ local_id ].do_eof end def do_channel_close( response ) local_id = response.read_long @log.debug "CHANNEL_CLOSE recieved (#{local_id})" if @log.debug? @channel_map[ local_id ].close false end def do_channel_request( response ) local_id = response.read_long request = response.read_string want_reply = response.read_bool request_data = response.remainder_as_buffer if @log.debug? @log.debug "CHANNEL_REQUEST recieved (#{local_id}:#{request})" end @channel_map[ local_id ].do_request request, want_reply, request_data end def do_channel_success( response ) local_id = response.read_long @log.debug "CHANNEL_SUCCESS recieved (#{local_id})" if @log.debug? @channel_map[ local_id ].do_success end def do_channel_failure( response ) local_id = response.read_long @log.debug "CHANNEL_FAILURE recieved (#{local_id})" if @log.debug? @channel_map[ local_id ].do_failure end #-- # ==================================================================== # INTERNAL STATE MANAGEMENT # ==================================================================== private #++ # Process a response recived from a "global_request". def process_request( response, success ) request = @request_queue.shift request.callback.call( success, response ) if request.callback end # Process all pending data requests. def process_data_requests # guard against recursive calls return if @processing_data_requests # guard against simultaneous calls @data_requests_mutex.synchronize do begin @processing_data_requests = true @data_requests.map! do |req| while req && req.channel.window_size > 0 remaining = if req.type req.channel.send_extended_data_packet( req.type, req.data ) else req.channel.send_data_packet( req.data ) end if remaining req.data = remaining else req = nil end end req end @data_requests.compact! ensure @processing_data_requests = false end end end #-- # ==================================================================== # MESSAGE DISPATCHER SETUP # ==================================================================== #++ MESSAGES = %w( global_request request_success request_failure channel_open channel_open_failure channel_open_confirmation channel_window_adjust channel_data channel_extended_data channel_eof channel_close channel_request channel_success channel_failure ).inject({}) do |map, event| constant = Constants.const_get event.upcase.to_sym map[constant] = { :name => event.to_sym, :method => instance_method( "do_#{event}".to_sym ) } map end end end end end