Sha256: 086eca5682f2b9e9701515811fa7fab2e8c7c0cf96fad41b8041bf4be22d5e57

Contents?: true

Size: 1.68 KB

Versions: 3

Compression:

Stored size: 1.68 KB

Contents

module CronoTrigger
  class PollingThread
    def initialize(model_queue, stop_flag, logger, executor)
      @model_queue = model_queue
      @stop_flag = stop_flag
      @logger = logger
      @executor = executor
    end

    def run
      @thread = Thread.start do
        @logger.info "(polling-thread-#{Thread.current.object_id}) Start polling thread"
        until @stop_flag.wait_for_set(CronoTrigger.config.polling_interval)
          begin
            model = @model_queue.pop(true)
            poll(model)
          rescue ThreadError => e
            @logger.error(e) unless e.message == "queue empty"
          rescue => e
            @logger.error(e)
          ensure
            @model_queue << model if model
          end
        end
      end
    end

    def join
      @thread.join
    end

    def poll(model)
      @logger.debug "(polling-thread-#{Thread.current.object_id}) Poll #{model}"
      records = []
      primary_key_offset = nil
      begin
        begin
          conn = model.connection_pool.checkout
          records = model.executables_with_lock(primary_key_offset: primary_key_offset)
          primary_key_offset = records.last && records.last.id
        ensure
          model.connection_pool.checkin(conn)
        end

        records.each do |record|
          @executor.post do
            model.connection_pool.with_connection do
              @logger.info "(executor-thread-#{Thread.current.object_id}) Execute #{record.class}-#{record.id}"
              begin
                record.do_execute
              rescue Exception => e
                @logger.error(e)
              end
            end
          end
        end
      end while records.any?
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
crono_trigger-0.3.0 lib/crono_trigger/polling_thread.rb
crono_trigger-0.2.0 lib/crono_trigger/polling_thread.rb
crono_trigger-0.1.0 lib/crono_trigger/polling_thread.rb