lib/openwfe/participants/socketparticipants.rb in openwferu-0.9.6 vs lib/openwfe/participants/socketparticipants.rb in openwferu-0.9.7

- old
+ new

@@ -39,12 +39,12 @@ # John Mettraux at openwfe.org # require 'yaml' require 'socket' +require 'timeout' -#require 'openwfe/utils' require 'openwfe/rest/xmlcodec' require 'openwfe/participants/participant' # @@ -78,23 +78,26 @@ @port = port end # # The method called by the engine for each incoming workitem. + # Will dispatch the workitem over a TCP connection. # def consume (workitem) + dispatch(workitem) + end + + def dispatch (workitem) + socket = TCPSocket.new(@host, @port) socket.puts encode_workitem(workitem) + socket.puts socket.close_write - reply = "" - while true - r = socket.gets - break unless r - reply << r - end + reply = fetch_reply(socket) + socket.close decode_reply(reply) end @@ -106,11 +109,11 @@ # # SocketParticipant.dispatch("127.0.0.1", 7007, workitem) # def SocketParticipant.dispatch (host, port, workitem) - SocketParticipant.new(host, port).consume(workitem) + SocketParticipant.new(host, port).dispatch(workitem) end protected # @@ -125,10 +128,36 @@ # By default, will just return the reply without touching it # def decode_reply (r) r end + + # + # The code that waits for the reply from the server, nicely + # wrapped inside a timeout and a rescue block. + # + def fetch_reply (socket) + + reply = "" + + begin + + timeout(7) do + while true + r = socket.gets + break unless r + reply << r + end + end + + rescue Exception => e + puts e + raise "timeout while waiting for reply" + end + + reply + end end # # This extension of of SocketParticipant can be used to dispatch # workitems towards an OpenWFEja instance, but OpenWFEru's SocketListener @@ -153,9 +182,13 @@ XmlSocketParticipant.new(host, port).consume(workitem) end protected + # + # This implementation encodes the workitem as an XML document. + # This is compatible with OpenWFEja. + # def encode_workitem (wi) sxml = OpenWFE::xml_encode(wi) s = "xmlCoder #{sxml.length}\n"