Sha256: 69593a33d256c8aa3e1e58d5136b7acc75003aa17e3b916590e33bfd252e8b85

Contents?: true

Size: 1.57 KB

Versions: 3

Compression:

Stored size: 1.57 KB

Contents

# frozen_string_literal: true

##
# FastlyNsq::Feeder is a queue interface wrapper for the manager's thread pool.
# This allows a consumer read loop to post a message directly to a
# processor (FastlyNsq::Listener) with a specified priority.
class FastlyNsq::Feeder
  attr_reader :processor, :priority

  ##
  # Create a FastlyNsq::Feeder
  # @param processor [FastlyNsq::Listener]
  # @param priority [Numeric]
  def initialize(processor, priority)
    @processor = processor
    @priority = priority
  end

  ##
  # Send a message to the processor with specified priority
  #
  # This will +post+ to the FastlyNsq.manager.pool with a queue priority and block
  # that will +call+ed. FastlyNsq.manager.pool is a PriorityThreadPool which is a
  # Concurrent::ThreadPoolExecutor that has @queue which in turn is a priority queue
  # that manages job priority
  #
  # The ThreadPoolExecutor is what actually works the @queue and sends +call+ to the queued Proc.
  # When that code is exec'ed +processer.call(message)+ is run. Processor in this context is
  # a FastlyNsq::Listener
  #
  # The block also will log exceptions here because Concurrent::ThreadPoolExecutor will
  # swallow the exception.
  #
  # @param message [Nsq::Message]
  # @see http://ruby-concurrency.github.io/concurrent-ruby/1.0.5/Concurrent/ThreadPoolExecutor.html#post-instance_method
  # @see Nsq::Connection#read_loop
  def push(message)
    FastlyNsq.manager.pool.post(priority) do
      processor.call(message)
    rescue => ex
      FastlyNsq.logger.error ex
      FastlyNsq.tracer.notice_error ex
      raise ex
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
fastly_nsq-1.18.1 lib/fastly_nsq/feeder.rb
fastly_nsq-1.18.0 lib/fastly_nsq/feeder.rb
fastly_nsq-1.17.1 lib/fastly_nsq/feeder.rb