Sha256: 10d241427d70204b72e4cfb8fa3940774e3023dfb7207c0fa33def25fa39c5f7

Contents?: true

Size: 1.19 KB

Versions: 16

Compression:

Stored size: 1.19 KB

Contents

require 'mkit/app/model/mkit_job'
require 'mkit/utils'

module MKIt
  class JobManager
    def initialize
      @workers = {}
    end

    def register_worker(worker, topics)
      topics.each { | topic |
        @workers[topic] ||= []
        MKItLogger.info("register #{worker.class} for topic #{topic}")
        @workers[topic] << worker
      }
    end

    def start
      MKItLogger.info('starting job manager')
      @thread = Thread.new do
        loop do
          job = MkitJob.take
          begin
            if job.nil?
              sleep(10)
            else
              topic = job.topic
              job.processing!
              if @workers[topic].nil?
                MKItLogger.warn("no workers found for topic '#{topic}'")
              else
                workers = @workers[topic]
                workers.each { | worker |
                  worker.do_the(job)
                }
              end 
            end
            job.done! unless job.nil?
          rescue Exception => e
            job.error! unless job.nil?
            MKItLogger.error e, e.message, e.backtrace.join("\n")
          end
        end
      end
    end

    def stop
      @thread.exit if @thread
    end
  end
end


Version data entries

16 entries across 16 versions & 1 rubygems

Version Path
mkit-0.9.0 lib/mkit/job_manager.rb
mkit-0.8.0 lib/mkit/job_manager.rb
mkit-0.7.2 lib/mkit/job_manager.rb
mkit-0.7.1 lib/mkit/job_manager.rb
mkit-0.7.0 lib/mkit/job_manager.rb
mkit-0.6.3 lib/mkit/job_manager.rb
mkit-0.6.2 lib/mkit/job_manager.rb
mkit-0.6.1 lib/mkit/job_manager.rb
mkit-0.6.0 lib/mkit/job_manager.rb
mkit-0.5.0 lib/mkit/job_manager.rb
mkit-0.4.3 lib/mkit/job_manager.rb
mkit-0.4.2 lib/mkit/job_manager.rb
mkit-0.4.1 lib/mkit/job_manager.rb
mkit-0.4.0 lib/mkit/job_manager.rb
mkit-0.3.0 lib/mkit/job_manager.rb
mkit-0.2.0 lib/mkit/job_manager.rb