lib/job_dispatch/broker.rb in job_dispatch-0.0.1 vs lib/job_dispatch/broker.rb in job_dispatch-0.0.2
- old
+ new
@@ -12,11 +12,11 @@
WORKER_IDLE_TIME = 10.123
POLL_TIME = 5.123
STOP_SIGNALS = %w[INT TERM KILL]
- IdleWorker = Struct.new :worker_id, :idle_since, :queue, :worker_name
+ IdleWorker = Struct.new :worker_id, :idle_since, :queue, :worker_name, :idle_count
# any object that will respond to `next_job_for_queue`, which should return a job, or nil if there
# are no jobs for that queue. The returned job should be a JSONable object that will be sent to the worker.
# This should include `target`, `action` and `parameters` keys.
@@ -65,10 +65,11 @@
def run
begin
puts "JobDispatch::Broker running in process #{Process.pid}"
JobDispatch.logger.info("JobDispatch::Broker running in process #{Process.pid}")
@running = true
+ @running_thread = Thread.current
poller = ZMQ::Poller.new
@socket = JobDispatch::Broker::Socket.new(@worker_bind_address)
@socket.connect
poller.register(@socket.poll_item)
@@ -97,23 +98,33 @@
if STOP_SIGNALS.include?(signal_name)
JobDispatch.logger.info("JobDispatch::Broker shutting down, due to #{signal_name} signal")
puts "JobDispatch::Broker shutting down, due to #{signal_name} signal"
@running = false
@status = "SHUTDOWN"
- sleep 1
+ # sleep 1
process_quit
- sleep 1
+ sleep 1 # let ZMQ send the messages before we close the socket.
end
+ rescue StandardError => e
+ JobDispatch.logger.error "Unexpected exception: #{e}"
end
end
ensure
@socket.disconnect if @socket
@socket = nil
end
end
+ def stop
+ if running?
+ @running = false
+ @running_thread.raise SignalException.new("TERM") unless Thread.current == @running_thread
+ end
+ end
+
+
def process_messages(poller)
# TODO: calculate the amount of time to sleep to wake up such that a scheduled event happens as close
# as possible to the time it was supposed to happen. This could additionally mean that the POLL_TIME
# could be arbitrarily large. As any communication with the broker will wake it immediately.
poll_time = POLL_TIME
@@ -156,11 +167,11 @@
begin
case command.command
when "ready"
# add to list of workers who are ready for work
- add_available_worker(command)
+ add_available_worker(command, 0)
# don't reply, leaves worker blocked waiting for a job to do.
reply = nil
when "goodbye"
@@ -170,11 +181,11 @@
# process completed job.
handle_completed_job(command)
if command.worker_ready?
# a completed job also means the worker is available for more work.
- add_available_worker(command)
+ add_available_worker(command, 1)
reply = nil
else
reply.parameters = {:status => 'thanks'}
end
@@ -233,13 +244,14 @@
end
def send_idle_commands(idle_time=nil)
idle_time ||= Time.now
idle_time -= WORKER_IDLE_TIME
- idle_workers = @workers_waiting_for_jobs.select { |worker_id, worker| worker.idle_since < idle_time }
+ idle_workers = @workers_waiting_for_jobs.select { |worker_id, worker| worker.idle_since < idle_time || worker.idle_count == 0 }
idle_workers.each do |worker_id, worker|
send_job_to_worker(InternalJob.new('idle', worker.queue), worker_id)
+ worker.idle_count += 1
end
end
def send_job_to_worker(job, worker_id)
@@ -266,37 +278,48 @@
send_command(command)
end
# add a worker to the list of workers available for jobs.
- def add_available_worker(command)
+ def add_available_worker(command, idle_count=0)
JobDispatch.logger.info("JobDispatch::Broker Worker '#{command.worker_id.to_json}' available for work on queue '#{command.queue}'")
+
+ # immediately remove any existing workers with the given name. If a worker has closed its connection and opened
+ # a new one (perhaps it started a long time before the broker did)
+
+ if command.worker_name # this is only sent on initial requests.
+ remove_worker_named(command.worker_name)
+ end
+
queue = command.queue
- idle_worker = IdleWorker.new(command.worker_id, Time.now, queue, command.worker_name)
+ idle_worker = IdleWorker.new(command.worker_id, Time.now, queue, command.worker_name, idle_count)
workers_waiting_for_jobs[command.worker_id] = idle_worker
queues[queue] << command.worker_id
if command.worker_name # this is only sent on initial requests.
worker_names[command.worker_id] = command.worker_name
end
end
# remove a worker from available list. Worker is shutting down or indicating that it will no longer
# be available for doing work.
def remove_available_worker(command)
- JobDispatch.logger.info("JobDispatch::Broker Worker '#{command.worker_id.to_json}' available for work on queue '#{command.queue}'")
+ JobDispatch.logger.info("JobDispatch::Broker Removing Worker '#{command.worker_id.to_json}' available for work on queue '#{command.queue}'")
# the goodbye command is sent by another socket connection, so the worker_id (socket identity) will
- # not match the socket actually waiting for work.
+ # not match the socket actually waiting for work. Remove the worker by its name, not socket identity
- keys = worker_names.select { |id, name| name == command.worker_name }.keys
+ remove_worker_named(command.worker_name)
+ {status: "see ya later"}
+ end
+
+ def remove_worker_named(worker_name)
+ keys = worker_names.select { |id, name| name == worker_name }.keys
keys.each do |worker_id|
workers_waiting_for_reply.delete(worker_id) # socket will be closing, no need to send it anything.
worker = workers_waiting_for_jobs.delete(worker_id)
queues[worker.queue].delete(worker_id) if worker
worker_names.delete(worker_id)
end
-
- {status: "see ya later"}
end
def dispatch_jobs_to_workers
# dequeue jobs from database for each queue
@queues.each_pair do |queue, worker_ids|