lib/job_dispatch/broker.rb in job_dispatch-0.0.1 vs lib/job_dispatch/broker.rb in job_dispatch-0.0.2

- old
+ new

@@ -12,11 +12,11 @@ WORKER_IDLE_TIME = 10.123 POLL_TIME = 5.123 STOP_SIGNALS = %w[INT TERM KILL] - IdleWorker = Struct.new :worker_id, :idle_since, :queue, :worker_name + IdleWorker = Struct.new :worker_id, :idle_since, :queue, :worker_name, :idle_count # any object that will respond to `next_job_for_queue`, which should return a job, or nil if there # are no jobs for that queue. The returned job should be a JSONable object that will be sent to the worker. # This should include `target`, `action` and `parameters` keys. @@ -65,10 +65,11 @@ def run begin puts "JobDispatch::Broker running in process #{Process.pid}" JobDispatch.logger.info("JobDispatch::Broker running in process #{Process.pid}") @running = true + @running_thread = Thread.current poller = ZMQ::Poller.new @socket = JobDispatch::Broker::Socket.new(@worker_bind_address) @socket.connect poller.register(@socket.poll_item) @@ -97,23 +98,33 @@ if STOP_SIGNALS.include?(signal_name) JobDispatch.logger.info("JobDispatch::Broker shutting down, due to #{signal_name} signal") puts "JobDispatch::Broker shutting down, due to #{signal_name} signal" @running = false @status = "SHUTDOWN" - sleep 1 + # sleep 1 process_quit - sleep 1 + sleep 1 # let ZMQ send the messages before we close the socket. end + rescue StandardError => e + JobDispatch.logger.error "Unexpected exception: #{e}" end end ensure @socket.disconnect if @socket @socket = nil end end + def stop + if running? + @running = false + @running_thread.raise SignalException.new("TERM") unless Thread.current == @running_thread + end + end + + def process_messages(poller) # TODO: calculate the amount of time to sleep to wake up such that a scheduled event happens as close # as possible to the time it was supposed to happen. This could additionally mean that the POLL_TIME # could be arbitrarily large. As any communication with the broker will wake it immediately. poll_time = POLL_TIME @@ -156,11 +167,11 @@ begin case command.command when "ready" # add to list of workers who are ready for work - add_available_worker(command) + add_available_worker(command, 0) # don't reply, leaves worker blocked waiting for a job to do. reply = nil when "goodbye" @@ -170,11 +181,11 @@ # process completed job. handle_completed_job(command) if command.worker_ready? # a completed job also means the worker is available for more work. - add_available_worker(command) + add_available_worker(command, 1) reply = nil else reply.parameters = {:status => 'thanks'} end @@ -233,13 +244,14 @@ end def send_idle_commands(idle_time=nil) idle_time ||= Time.now idle_time -= WORKER_IDLE_TIME - idle_workers = @workers_waiting_for_jobs.select { |worker_id, worker| worker.idle_since < idle_time } + idle_workers = @workers_waiting_for_jobs.select { |worker_id, worker| worker.idle_since < idle_time || worker.idle_count == 0 } idle_workers.each do |worker_id, worker| send_job_to_worker(InternalJob.new('idle', worker.queue), worker_id) + worker.idle_count += 1 end end def send_job_to_worker(job, worker_id) @@ -266,37 +278,48 @@ send_command(command) end # add a worker to the list of workers available for jobs. - def add_available_worker(command) + def add_available_worker(command, idle_count=0) JobDispatch.logger.info("JobDispatch::Broker Worker '#{command.worker_id.to_json}' available for work on queue '#{command.queue}'") + + # immediately remove any existing workers with the given name. If a worker has closed its connection and opened + # a new one (perhaps it started a long time before the broker did) + + if command.worker_name # this is only sent on initial requests. + remove_worker_named(command.worker_name) + end + queue = command.queue - idle_worker = IdleWorker.new(command.worker_id, Time.now, queue, command.worker_name) + idle_worker = IdleWorker.new(command.worker_id, Time.now, queue, command.worker_name, idle_count) workers_waiting_for_jobs[command.worker_id] = idle_worker queues[queue] << command.worker_id if command.worker_name # this is only sent on initial requests. worker_names[command.worker_id] = command.worker_name end end # remove a worker from available list. Worker is shutting down or indicating that it will no longer # be available for doing work. def remove_available_worker(command) - JobDispatch.logger.info("JobDispatch::Broker Worker '#{command.worker_id.to_json}' available for work on queue '#{command.queue}'") + JobDispatch.logger.info("JobDispatch::Broker Removing Worker '#{command.worker_id.to_json}' available for work on queue '#{command.queue}'") # the goodbye command is sent by another socket connection, so the worker_id (socket identity) will - # not match the socket actually waiting for work. + # not match the socket actually waiting for work. Remove the worker by its name, not socket identity - keys = worker_names.select { |id, name| name == command.worker_name }.keys + remove_worker_named(command.worker_name) + {status: "see ya later"} + end + + def remove_worker_named(worker_name) + keys = worker_names.select { |id, name| name == worker_name }.keys keys.each do |worker_id| workers_waiting_for_reply.delete(worker_id) # socket will be closing, no need to send it anything. worker = workers_waiting_for_jobs.delete(worker_id) queues[worker.queue].delete(worker_id) if worker worker_names.delete(worker_id) end - - {status: "see ya later"} end def dispatch_jobs_to_workers # dequeue jobs from database for each queue @queues.each_pair do |queue, worker_ids|