Sha256: c75619c61826fe7fd8be51bc0272aa7b1c5c48eabad46c1354eeb428486b9a1c

Contents?: true

Size: 1.23 KB

Versions: 3

Compression:

Stored size: 1.23 KB

Contents

module Umbra
  class Publisher < SynchronousPublisher
    MAX_QUEUE_SIZE = 100

    class << self
      def call(env, response)
        start_once!

        if @queue.size > MAX_QUEUE_SIZE
          Umbra.logger.warn '[umbra] Publish queue at max - dropping items'
          return
        end

        @queue.push(proc { super(env, response) })
      end

      private

      # rubocop:disable Metrics/MethodLength
      def start_once!
        LOCK.synchronize do
          return if @started == Process.pid

          Umbra.logger.info '[umbra] Starting publishing thread'

          @started = Process.pid
          @queue = Queue.new

          worker_thread = Thread.new do
            while (x = @queue.pop)
              break if x == STOP

              begin
                x.call
              rescue StandardError => e
                Umbra.logger.warn '[umbra] Error in publishing thread'
                Umbra.config.error_handler.call(e)
              end
            end
          end

          at_exit do
            @queue.push(STOP)
            worker_thread.join
          end
        end
      end
      # rubocop:enable Metrics/MethodLength
    end

    STOP = Object.new
    LOCK = Mutex.new
    private_constant :STOP, :LOCK
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
umbra-rb-0.2.0 lib/umbra/publisher.rb
umbra-rb-0.1.5.pre lib/umbra/publisher.rb
umbra-rb-0.1.4.pre lib/umbra/publisher.rb