lib/job_dispatch/worker/socket.rb in job_dispatch-0.0.2 vs lib/job_dispatch/worker/socket.rb in job_dispatch-0.1.0

- old
+ new

@@ -6,15 +6,18 @@ class Worker class Socket attr :socket + attr :touch_socket attr :item_class def initialize(connect_address, item_klass) @socket = JobDispatch.context.socket(ZMQ::REQ) @socket.connect(connect_address) + @touch_socket = JobDispatch.context.socket(ZMQ::DEALER) + @touch_socket.connect(connect_address) @item_class = item_klass end def poll_item @poll_item ||= ZMQ::Pollitem(@socket, ZMQ::POLLIN) @@ -27,11 +30,18 @@ def send_goodbye(queue) @socket.send(JSON.dump({command: 'goodbye', worker_name: identity})) end def close - @socket.close + if @socket + @socket.close rescue nil + @socket = nil + end + if @touch_socket + @touch_socket.close rescue nil + @touch_socket = nil + end end def identity @identity ||= begin hostname = ::Socket.gethostname @@ -43,12 +53,13 @@ # read an incoming message. The thread will block if there is no readable message. # # @return [JobDispatch::Item] the item to be processed (or nil if there isn't a valid job) def read_item - json = @socket.recv begin + drain_touch_socket + json = @socket.recv params = JSON.parse(json) case params["command"] when "job" item = item_class.new params["target"], params["method"], *params["parameters"] when "idle" @@ -65,10 +76,18 @@ nil end item end + # drain any messages that may have been received on the touch socket. + def drain_touch_socket + loop do + message = @touch_socket.recv_nonblock + break if message.nil? + end + end + # after execution, send the response. def send_response(job_id, status, result) JobDispatch.logger.info "Worker #{Process.pid} completed job_id: #{job_id}: #{status}, result: #{result}" response = { command: 'completed', @@ -84,13 +103,10 @@ hash = { command: 'touch', job_id: job_id } hash[:timeout] = timeout if timeout - @socket.send(JSON.dump(hash)) - json = @socket.recv # wait for acknowledgement... this could be done via pub/sub to be asynchronous. - JSON.parse(json) rescue {:error => "Failed to decode JSON from dispatcher: #{json}"} + @touch_socket.send(JSON.dump(hash)) end - end end end