module Rserve # class providing TCP/IP connection to an Rserve class Connection < Rserve::Engine @@connected_object=nil include Rserve::Protocol # :section: Errors RserveNotStartedError=Class.new(StandardError) ServerNotAvailableError=Class.new(StandardError) IncorrectServerError=Class.new(StandardError) IncorrectServerVersionError=Class.new(StandardError) IncorrectProtocolError=Class.new(StandardError) NotConnectedError=Class.new(StandardError) # Eval error class EvalError < RuntimeError attr_accessor :request_packet def initialize(rp) @request_packet=rp end end attr_reader :hostname attr_reader :port_number attr_reader :protocol attr_reader :last_error attr_reader :connected attr_reader :auth_req attr_reader :auth_type attr_reader :key attr_reader :rt attr_reader :s attr_reader :port attr_reader :session attr_writer :transfer_charset attr_reader :rsrv_version attr_writer :persistent # authorization type: plain text AT_plain=0 # authorization type: unix crypt AT_crypt=1 # Make a new local connection # You could provide a hash with options. Options are analog to java client: # [+:auth_req+] If authentification is required (false by default) # [+:transfer_charset+] Transfer charset ("UTF-8" by default) # [+:auth_type+] Type of authentification (AT_plain by default) # [+:hostname+] Hostname of Rserve ("127.0.0.1" by default) # [+:port_number+] Port Number of Rserve (6311 by default) # [+:max_tries+] Maximum number of tries before give up (5 by default) # [+:cmd_init+] Command to init Rserve if not initialized ("R CMD Rserve" by default) # [+:proc_rserve_ok+] Proc testing if Rserve works (uses system by default) def initialize(opts=Hash.new) @auth_req = opts.delete(:auth_req) || false @transfer_charset = opts.delete(:transfer_charset) || "UTF-8" @auth_type = opts.delete(:auth_type) || AT_plain @hostname = opts.delete(:hostname) || "127.0.0.1" @port_number = opts.delete(:port_number) || 6311 @max_tries = opts.delete(:max_tries) || 5 @cmd_init = opts.delete(:cmd_init) || "R CMD Rserve" @proc_rserve_ok = opts.delete(:proc_rserve_ok) || lambda { system "killall -s 0 Rserve" } @session = opts.delete(:session) || nil @tries = 0 @connected=false if (!@session.nil?) @hostname=@session.host @port_number=@session.port end begin #puts "Tryin to connect..." connect rescue Errno::ECONNREFUSED if @tries<@max_tries @tries+=1 # Rserve is available? if @proc_rserve_ok.call # Rserve is available. Incorrect host and/or portname raise ServerNotAvailableError, "Rserve started, but not available on #{hostname}:#{port_number}" # Rserve not available. We should instanciate it first else if run_server # Wait a moment, please sleep(0.25) retry else raise RserveNotStartedError, "Can't start Rserve" end end #puts "Init RServe" else raise end end end def connect # On windows, Rserve doesn't allows concurrent connections. # So, we must close the last open connection first if ON_WINDOWS and !@@connected_object.nil? @@connected_object.close end close if @connected @s = TCPSocket::new(@hostname, @port_number) @rt=Rserve::Talk.new(@s) if @session.nil? #puts "Connected" # Accept first input input=@s.recv(32).unpack("a4a4a4a4a4a4a4a4") raise IncorrectServerError, "Handshake failed: Rsrv signature expected, but received [#{input[0]}]" unless input[0]=="Rsrv" @rsrv_version=input[1].to_i raise IncorrectServerVersionError, "Handshake failed: The server uses more recent protocol than this client." if @rsrv_version>103 @protocol=input[2] raise IncorrectProtocolError, "Handshake failed: unsupported transfer protocol #{@protocol}, I talk only QAP1." if @protocol!="QAP1" (3..7).each do |i| attr=input[i] if (attr=="ARpt") if (!auth_req) # this method is only fallback when no other was specified auth_req=true auth_type=AT_plain end end if (attr=="ARuc") auth_req=true authType=AT_crypt end if (attr[0]=='K') key=attr[1,3] end end else # we have a session to take care of @s.write(@session.key.pack("C*")) @rsrv_version=session.rsrv_version end @connected=true @@connected_object=self @last_error="OK" end # Check connection state. Note that currently this state is not checked on-the-spot, that is if connection went down by an outside event this is not reflected by the flag. # return +true+ if this connection is alive def connected? @connected end # Closes current connection def close if !@s.nil? and !@s.closed? @s.close_write @s.close_read end raise "Can't close socket" unless @s.closed? @connected=false @@connected_object=nil true end # Get server version as reported during the handshake. def get_server_version @rsrv_version end # evaluates the given command, but does not fetch the result (useful for assignment operations) # * @param cmd command/expression string */ def void_eval(cmd) raise NotConnectedError if !connected? or rt.nil? rp=rt.request(:cmd=>Rserve::Protocol::CMD_voidEval, :cont=>cmd+"\n") if !rp.nil? and rp.ok? true else raise EvalError.new(rp), "voidEval failed: #{rp.to_s}" end end # Evaluates the given command, detaches the session (see detach()) and closes connection while the command is being evaluted (requires Rserve 0.4+). # Note that a session cannot be attached again until the commad was successfully processed. Technically the session is put into listening mode while the command is being evaluated but accept is called only after the command was evaluated. One commonly used techique to monitor detached working sessions is to use second connection to poll the status (e.g. create a temporary file and return the full path before detaching thus allowing new connections to read it). # * @param cmd command/expression string. # * @return session object that can be use to attach back to the session once the command completed def void_eval_detach(cmd) raise NotConnectedError if !connected? or rt.nil? rp=rt.request(:cmd=>Rserve::Protocol::CMD_detachedVoidEval,:cont=>cmd+"\n") if rp.nil? or !rp.ok? raise EvalError.new(rp), "detached void eval failed : #{rp.to_s}" else s=Rserve::Session.new(self,rp) close s end end # evaluates the given command and retrieves the result # * @param cmd command/expression string # * @return R-xpression or null if an error occured */ def eval(cmd) raise NotConnectedError if !connected? or rt.nil? rp=rt.request(:cmd=>Rserve::Protocol::CMD_eval, :cont=>cmd+"\n") if !rp.nil? and rp.ok? parse_eval_response(rp) else raise EvalError.new(rp), "eval failed: #{rp.to_s}" end end # NOT TESTED def parse_eval_response(rp) rxo=0 pc=rp.cont if (rsrv_version>100) # /* since 0101 eval responds correctly by using DT_SEXP type/len header which is 4 bytes long */ rxo=4 # we should check parameter type (should be DT_SEXP) and fail if it's not if pc.nil? raise "Error while processing eval output: SEXP (type #{Rserve::Protocol::DT_SEXP}) expected but nil returned" elsif (pc[0]!=Rserve::Protocol::DT_SEXP and pc[0]!=(Rserve::Protocol::DT_SEXP|Rserve::Protocol::DT_LARGE)) raise "Error while processing eval output: SEXP (type #{Rserve::Protocol::DT_SEXP}) expected but found result type "+pc[0].to_s+"." end if (pc[0]==(Rserve::Protocol::DT_SEXP|Rserve::Protocol::DT_LARGE)) rxo=8; # large data need skip of 8 bytes end # warning: we are not checking or using the length - we assume that only the one SEXP is returned. This is true for the current CMD_eval implementation, but may not be in the future. */ end if pc.length>rxo rx=REXPFactory.new; rx.parse_REXP(pc, rxo); return rx.get_REXP(); else return nil end end #assign a string value to a symbol in R. The symbol is created if it doesn't exist already. # @param sym symbol name. Currently assign uses CMD_setSEXP command of Rserve, i.e. the symbol value is NOT parsed. It is the responsibility of the user to make sure that the symbol name is valid in R (recall the difference between a symbol and an expression!). In fact R will always create the symbol, but it may not be accessible (examples: "bar\nfoo" or "bar$foo"). # @param ct contents def assign(sym, ct) raise NotConnectedError if !connected? or rt.nil? case ct when String assign_string(sym,ct) when REXP assign_rexp(sym,ct) else assign_rexp(sym, Rserve::REXP::Wrapper.wrap(ct)) end end def assign_string(sym,ct) symn = sym.unpack("C*") ctn = ct.unpack("C*") sl=symn.length+1 cl=ctn.length+1 sl=(sl&0xfffffc)+4 if ((sl&3)>0) # make sure the symbol length is divisible by 4 cl=(cl&0xfffffc)+4 if ((cl&3)>0) # make sure the content length is divisible by 4 rq=Array.new(sl+4+cl+4) symn.length.times {|i| rq[i+4]=symn[i]} ic=symn.length while (icRserve::Protocol::CMD_setSEXP,:cont=>rq) if (!rp.nil? and rp.ok?) rp else raise "Assign Failed" end end def assign_rexp(sym, rexp) r = REXPFactory.new(rexp); rl=r.get_binary_length(); symn=sym.unpack("C*"); sl=symn.length+1; sl=(sl&0xfffffc)+4 if ((sl&3)>0) # make sure the symbol length is divisible by 4 rq=Array.new(sl+rl+((rl>0xfffff0) ? 12 : 8)); symn.length.times {|i| rq[i+4]=symn[i]} ic=symn.length while(ic0xfffff0) ? 12 : 8)); # puts "ASSIGN RQ: #{rq}" if $DEBUG rp=rt.request(:cmd=>Rserve::Protocol::CMD_setSEXP, :cont=>rq) if (!rp.nil? and rp.ok?) rp else raise "Assign Failed" end end # Shutdown remote Rserve. # Note that some Rserves cannot be shut down from the client side def shutdown raise NotConnectedError if !connected? or rt.nil? rp=rt.request(:cmd=>Rserve::Protocol::CMD_shutdown) if !rp.nil? and rp.ok? true else raise "Shutdown failed" end end # Detaches the session and closes the connection (requires Rserve 0.4+). # The session can be only resumed by calling RSession.attach def detach raise NotConnectedError if !connected? or rt.nil? rp=rt.request(:cmd=>Rserve::Protocol::CMD_detachSession) if !rp.nil? and rp.ok? s=Rserve::Session.new(self,rp) close s else raise "Cannot detach" end end private def run_server if RUBY_PLATFORM != "java" system @cmd_init else pid = Spoon.spawnp *@cmd_init.split return false if pid < 0 Process.waitpid pid true end end end end