lib/job_dispatch/worker/socket.rb in job_dispatch-0.0.2 vs lib/job_dispatch/worker/socket.rb in job_dispatch-0.1.0
- old
+ new
@@ -6,15 +6,18 @@
class Worker
class Socket
attr :socket
+ attr :touch_socket
attr :item_class
def initialize(connect_address, item_klass)
@socket = JobDispatch.context.socket(ZMQ::REQ)
@socket.connect(connect_address)
+ @touch_socket = JobDispatch.context.socket(ZMQ::DEALER)
+ @touch_socket.connect(connect_address)
@item_class = item_klass
end
def poll_item
@poll_item ||= ZMQ::Pollitem(@socket, ZMQ::POLLIN)
@@ -27,11 +30,18 @@
def send_goodbye(queue)
@socket.send(JSON.dump({command: 'goodbye', worker_name: identity}))
end
def close
- @socket.close
+ if @socket
+ @socket.close rescue nil
+ @socket = nil
+ end
+ if @touch_socket
+ @touch_socket.close rescue nil
+ @touch_socket = nil
+ end
end
def identity
@identity ||= begin
hostname = ::Socket.gethostname
@@ -43,12 +53,13 @@
# read an incoming message. The thread will block if there is no readable message.
#
# @return [JobDispatch::Item] the item to be processed (or nil if there isn't a valid job)
def read_item
- json = @socket.recv
begin
+ drain_touch_socket
+ json = @socket.recv
params = JSON.parse(json)
case params["command"]
when "job"
item = item_class.new params["target"], params["method"], *params["parameters"]
when "idle"
@@ -65,10 +76,18 @@
nil
end
item
end
+ # drain any messages that may have been received on the touch socket.
+ def drain_touch_socket
+ loop do
+ message = @touch_socket.recv_nonblock
+ break if message.nil?
+ end
+ end
+
# after execution, send the response.
def send_response(job_id, status, result)
JobDispatch.logger.info "Worker #{Process.pid} completed job_id: #{job_id}: #{status}, result: #{result}"
response = {
command: 'completed',
@@ -84,13 +103,10 @@
hash = {
command: 'touch',
job_id: job_id
}
hash[:timeout] = timeout if timeout
- @socket.send(JSON.dump(hash))
- json = @socket.recv # wait for acknowledgement... this could be done via pub/sub to be asynchronous.
- JSON.parse(json) rescue {:error => "Failed to decode JSON from dispatcher: #{json}"}
+ @touch_socket.send(JSON.dump(hash))
end
-
end
end
end