Sha256: 10d241427d70204b72e4cfb8fa3940774e3023dfb7207c0fa33def25fa39c5f7
Contents?: true
Size: 1.19 KB
Versions: 16
Compression:
Stored size: 1.19 KB
Contents
require 'mkit/app/model/mkit_job' require 'mkit/utils' module MKIt class JobManager def initialize @workers = {} end def register_worker(worker, topics) topics.each { | topic | @workers[topic] ||= [] MKItLogger.info("register #{worker.class} for topic #{topic}") @workers[topic] << worker } end def start MKItLogger.info('starting job manager') @thread = Thread.new do loop do job = MkitJob.take begin if job.nil? sleep(10) else topic = job.topic job.processing! if @workers[topic].nil? MKItLogger.warn("no workers found for topic '#{topic}'") else workers = @workers[topic] workers.each { | worker | worker.do_the(job) } end end job.done! unless job.nil? rescue Exception => e job.error! unless job.nil? MKItLogger.error e, e.message, e.backtrace.join("\n") end end end end def stop @thread.exit if @thread end end end
Version data entries
16 entries across 16 versions & 1 rubygems