lib/openwfe/participants/socketparticipants.rb in openwferu-0.9.6 vs lib/openwfe/participants/socketparticipants.rb in openwferu-0.9.7
- old
+ new
@@ -39,12 +39,12 @@
# John Mettraux at openwfe.org
#
require 'yaml'
require 'socket'
+require 'timeout'
-#require 'openwfe/utils'
require 'openwfe/rest/xmlcodec'
require 'openwfe/participants/participant'
#
@@ -78,23 +78,26 @@
@port = port
end
#
# The method called by the engine for each incoming workitem.
+ # Will dispatch the workitem over a TCP connection.
#
def consume (workitem)
+ dispatch(workitem)
+ end
+
+ def dispatch (workitem)
+
socket = TCPSocket.new(@host, @port)
socket.puts encode_workitem(workitem)
+ socket.puts
socket.close_write
- reply = ""
- while true
- r = socket.gets
- break unless r
- reply << r
- end
+ reply = fetch_reply(socket)
+
socket.close
decode_reply(reply)
end
@@ -106,11 +109,11 @@
#
# SocketParticipant.dispatch("127.0.0.1", 7007, workitem)
#
def SocketParticipant.dispatch (host, port, workitem)
- SocketParticipant.new(host, port).consume(workitem)
+ SocketParticipant.new(host, port).dispatch(workitem)
end
protected
#
@@ -125,10 +128,36 @@
# By default, will just return the reply without touching it
#
def decode_reply (r)
r
end
+
+ #
+ # The code that waits for the reply from the server, nicely
+ # wrapped inside a timeout and a rescue block.
+ #
+ def fetch_reply (socket)
+
+ reply = ""
+
+ begin
+
+ timeout(7) do
+ while true
+ r = socket.gets
+ break unless r
+ reply << r
+ end
+ end
+
+ rescue Exception => e
+ puts e
+ raise "timeout while waiting for reply"
+ end
+
+ reply
+ end
end
#
# This extension of of SocketParticipant can be used to dispatch
# workitems towards an OpenWFEja instance, but OpenWFEru's SocketListener
@@ -153,9 +182,13 @@
XmlSocketParticipant.new(host, port).consume(workitem)
end
protected
+ #
+ # This implementation encodes the workitem as an XML document.
+ # This is compatible with OpenWFEja.
+ #
def encode_workitem (wi)
sxml = OpenWFE::xml_encode(wi)
s = "xmlCoder #{sxml.length}\n"