Sha256: ea0fc4f99ef8d2f542d415ed2aa67ec63fd342db45277843cb61764ffab15808
Contents?: true
Size: 1.6 KB
Versions: 1
Compression:
Stored size: 1.6 KB
Contents
module CronoTrigger class PollingThread def initialize(model_queue, stop_flag, logger, executor) @model_queue = model_queue @stop_flag = stop_flag @logger = logger @executor = executor end def run @thread = Thread.start do @logger.info "(polling-thread-#{Thread.current.object_id}) Start polling thread" until @stop_flag.wait_for_set(CronoTrigger.config.polling_interval) begin model = @model_queue.pop(true) poll(model) rescue ThreadError => e @logger.error(e) unless e.message == "queue empty" rescue => e @logger.error(e) ensure @model_queue << model if model end end end end def join @thread.join end def poll(model) @logger.debug "(polling-thread-#{Thread.current.object_id}) Poll #{model}" records = [] primary_key_offset = nil begin model.connection_pool.with_connection do records = model.executables_with_lock(primary_key_offset: primary_key_offset) primary_key_offset = records.last && records.last.id end records.each do |record| @executor.post do model.connection_pool.with_connection do @logger.info "(executor-thread-#{Thread.current.object_id}) Execute #{record.class}-#{record.id}" begin record.do_execute rescue Exception => e @logger.error(e) end end end end end while records.any? end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
crono_trigger-0.3.2 | lib/crono_trigger/polling_thread.rb |