lib/openwfe/participants/socketparticipants.rb in ruote-0.9.18 vs lib/openwfe/participants/socketparticipants.rb in ruote-0.9.19

- old
+ new

@@ -1,34 +1,34 @@ # #-- # Copyright (c) 2007-2008, John Mettraux, OpenWFE.org # All rights reserved. -# -# Redistribution and use in source and binary forms, with or without +# +# Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: -# +# # . Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# . Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation +# list of conditions and the following disclaimer. +# +# . Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. -# +# # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. #++ # # @@ -45,158 +45,158 @@ require 'openwfe/participants/participant' module OpenWFE + # + # This participant implementation dispatches workitems over TCP sockets. + # By default the workitem are dumped as YAML strings, but you can override + # the encode_workitem(wi) method. + # + # A small example : + # + # require 'openwfe/particpants/socketparticipants' + # + # sp = OpenWFE::SocketParticipant.new("target.host.xx", 7007) + # + # engine.register_participant("Alfred", sp) + # + class SocketParticipant + + attr_accessor :host, :port + # - # This participant implementation dispatches workitems over TCP sockets. - # By default the workitem are dumped as YAML strings, but you can override - # the encode_workitem(wi) method. + # The constructor # - # A small example : + def initialize (host, port) + + @host = host + @port = port + end + # - # require 'openwfe/particpants/socketparticipants' + # The method called by the engine for each incoming workitem. + # Will dispatch the workitem over a TCP connection. # - # sp = OpenWFE::SocketParticipant.new("target.host.xx", 7007) - # - # engine.register_participant("Alfred", sp) - # - class SocketParticipant + def consume (workitem) - attr_accessor :host, :port + dispatch(workitem) + end - # - # The constructor - # - def initialize (host, port) + def dispatch (workitem) - @host = host - @port = port - end + socket = TCPSocket.new(@host, @port) + socket.puts encode_workitem(workitem) + socket.puts + socket.close_write - # - # The method called by the engine for each incoming workitem. - # Will dispatch the workitem over a TCP connection. - # - def consume (workitem) + #print "\n__socket.closed? #{socket.closed?}" - dispatch(workitem) - end + reply = fetch_reply(socket) - def dispatch (workitem) + socket.close - socket = TCPSocket.new(@host, @port) - socket.puts encode_workitem(workitem) - socket.puts - socket.close_write + decode_reply(reply) + end - #print "\n__socket.closed? #{socket.closed?}" + # + # A 'static' method for dispatching workitems, you can use it + # directly, without instantiating the SocketParticipant : + # + # require 'openwfe/participants/socketparticipants' + # + # SocketParticipant.dispatch("127.0.0.1", 7007, workitem) + # + def SocketParticipant.dispatch (host, port, workitem) - reply = fetch_reply(socket) + SocketParticipant.new(host, port).dispatch(workitem) + end - socket.close + protected - decode_reply(reply) - end + # + # By default, uses YAML to serialize the workitem + # (of course you can override this method). + # + def encode_workitem (wi) + YAML.dump(wi) + end - # - # A 'static' method for dispatching workitems, you can use it - # directly, without instantiating the SocketParticipant : - # - # require 'openwfe/participants/socketparticipants' - # - # SocketParticipant.dispatch("127.0.0.1", 7007, workitem) - # - def SocketParticipant.dispatch (host, port, workitem) + # + # By default, will just return the reply without touching it + # + def decode_reply (r) + r + end - SocketParticipant.new(host, port).dispatch(workitem) - end + # + # The code that waits for the reply from the server, nicely + # wrapped inside a timeout and a rescue block. + # + def fetch_reply (socket) - protected + reply = "" - # - # By default, uses YAML to serialize the workitem - # (of course you can override this method). - # - def encode_workitem (wi) - YAML.dump(wi) - end + begin - # - # By default, will just return the reply without touching it - # - def decode_reply (r) - r + timeout(7) do + while true + r = socket.gets + break unless r + reply << r end + end - # - # The code that waits for the reply from the server, nicely - # wrapped inside a timeout and a rescue block. - # - def fetch_reply (socket) + rescue Exception => e + puts e + raise "timeout while waiting for reply" + end - reply = "" + reply + end + end - begin + # + # This extension of of SocketParticipant can be used to dispatch + # workitems towards an OpenWFEja instance, but OpenWFEru's SocketListener + # understands XML workitems as well. + # + class XmlSocketParticipant < SocketParticipant - timeout(7) do - while true - r = socket.gets - break unless r - reply << r - end - end + #def initialize (host, port) + # super + #end - rescue Exception => e - puts e - raise "timeout while waiting for reply" - end - - reply - end - end - # - # This extension of of SocketParticipant can be used to dispatch - # workitems towards an OpenWFEja instance, but OpenWFEru's SocketListener - # understands XML workitems as well. + # A 'static' method for dispatching workitems, you can use it + # directly, without instantiating the SocketParticipant : # - class XmlSocketParticipant < SocketParticipant + # require 'openwfe/participants/socketparticipants' + # + # SocketParticipant.dispatch("127.0.0.1", 7007, workitem) + # + def XmlSocketParticipant.dispatch (host, port, workitem) - #def initialize (host, port) - # super - #end + XmlSocketParticipant.new(host, port).consume(workitem) + end - # - # A 'static' method for dispatching workitems, you can use it - # directly, without instantiating the SocketParticipant : - # - # require 'openwfe/participants/socketparticipants' - # - # SocketParticipant.dispatch("127.0.0.1", 7007, workitem) - # - def XmlSocketParticipant.dispatch (host, port, workitem) + protected - XmlSocketParticipant.new(host, port).consume(workitem) - end + # + # This implementation encodes the workitem as an XML document. + # This is compatible with OpenWFEja. + # + def encode_workitem (wi) - protected + sxml = OpenWFE::XmlCodec::encode(wi) - # - # This implementation encodes the workitem as an XML document. - # This is compatible with OpenWFEja. - # - def encode_workitem (wi) - - sxml = OpenWFE::XmlCodec::encode(wi) - - s = "xmlCoder #{sxml.length}\n" - s << "\n" - s << sxml - s << "\n" - s << "\n" - s - end - end + s = "xmlCoder #{sxml.length}\n" + s << "\n" + s << sxml + s << "\n" + s << "\n" + s + end + end end