Sha256: ea0fc4f99ef8d2f542d415ed2aa67ec63fd342db45277843cb61764ffab15808

Contents?: true

Size: 1.6 KB

Versions: 1

Compression:

Stored size: 1.6 KB

Contents

module CronoTrigger
  class PollingThread
    def initialize(model_queue, stop_flag, logger, executor)
      @model_queue = model_queue
      @stop_flag = stop_flag
      @logger = logger
      @executor = executor
    end

    def run
      @thread = Thread.start do
        @logger.info "(polling-thread-#{Thread.current.object_id}) Start polling thread"
        until @stop_flag.wait_for_set(CronoTrigger.config.polling_interval)
          begin
            model = @model_queue.pop(true)
            poll(model)
          rescue ThreadError => e
            @logger.error(e) unless e.message == "queue empty"
          rescue => e
            @logger.error(e)
          ensure
            @model_queue << model if model
          end
        end
      end
    end

    def join
      @thread.join
    end

    def poll(model)
      @logger.debug "(polling-thread-#{Thread.current.object_id}) Poll #{model}"
      records = []
      primary_key_offset = nil
      begin
        model.connection_pool.with_connection do
          records = model.executables_with_lock(primary_key_offset: primary_key_offset)
          primary_key_offset = records.last && records.last.id
        end

        records.each do |record|
          @executor.post do
            model.connection_pool.with_connection do
              @logger.info "(executor-thread-#{Thread.current.object_id}) Execute #{record.class}-#{record.id}"
              begin
                record.do_execute
              rescue Exception => e
                @logger.error(e)
              end
            end
          end
        end
      end while records.any?
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
crono_trigger-0.3.2 lib/crono_trigger/polling_thread.rb