Sha256: 3f03bb6334deab8c2a1a5d7f1137996a0500050d081e83bfabb1b2604afa286f

Contents?: true

Size: 1.15 KB

Versions: 3

Compression:

Stored size: 1.15 KB

Contents

require 'thread'

module RailsCustomerbeats
  # An instrumenter that does not send notifications. This is used in the
  # AsyncQueue so saving events does not send any notifications, not even
  # for logging.
  class VoidInstrumenter < ::ActiveSupport::Notifications::Instrumenter
    def instrument(name, payload={})
      yield(payload) if block_given?
    end
  end

  class AsyncConsumer
    attr_reader :thread

    def initialize(queue=Queue.new, &block)
      @off   = true
      @block = block
      @queue = queue
      @mutex = Mutex.new

      @thread = Thread.new do
        set_void_instrumenter
        consume
      end
    end

    def push(*args)
      @mutex.synchronize { @off = false }
      @queue.push(*args)
    end

    def finished?
      @off
    end

  protected

    def set_void_instrumenter #:nodoc:
      Thread.current[:"instrumentation_#{notifier.object_id}"] = VoidInstrumenter.new(notifier)
    end

    def notifier #:nodoc:
      ActiveSupport::Notifications.notifier
    end

    def consume #:nodoc:
      while args = @queue.shift
        @block.call(args)
        @mutex.synchronize { @off = @queue.empty? }
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
rails_customerbeats-0.0.5 lib/rails_customerbeats/async_consumer.rb
rails_customerbeats-0.0.4 lib/rails_customerbeats/async_consumer.rb
rails_customerbeats-0.3 lib/rails_customerbeats/async_consumer.rb