Sha256: 495deb424cb3beec74957d1d8b9c2c39e8cbfa9a280998f99c1c7258b01c47ba

Contents?: true

Size: 1.04 KB

Versions: 5

Compression:

Stored size: 1.04 KB

Contents

# encoding: utf-8
require "logstash/namespace"
require "logstash/outputs/websocket_topics"

class LogStash::Outputs::WebSocket::Pubsub
  attr_accessor :logger

  def initialize
    @subscribers = []
    @subscribers_lock = Mutex.new
  end # def initialize

  def publish(object)
    @subscribers_lock.synchronize do
      break if @subscribers.size == 0

      failed = []
      @subscribers.each do |subscriber|
        begin
          subscriber.call(object)
        rescue => e
          @logger.error("Failed to publish to subscriber", :subscriber => subscriber, :exception => e)
          failed << subscriber
        end
      end

      failed.each do |subscriber|
        @subscribers.delete(subscriber)
      end
    end # @subscribers_lock.synchronize
  end # def Pubsub

  def subscribe(&block)
    queue = Queue.new
    @subscribers_lock.synchronize do
      @subscribers << proc do |event|
        queue << event
      end
    end

    while true
      block.call(queue.pop)
    end
  end # def subscribe
end # class LogStash::Outputs::WebSocket::Pubsub

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
logstash-output-websocket_topics-2.1.10 lib/logstash/outputs/websocket_topics/pubsub.rb
logstash-output-websocket_topics-2.1.9 lib/logstash/outputs/websocket_topics/pubsub.rb
logstash-output-websocket_topics-2.1.7 lib/logstash/outputs/websocket_topics/pubsub.rb
logstash-output-websocket_topics-2.1.6 lib/logstash/outputs/websocket_topics/pubsub.rb
logstash-output-websocket_topics-2.1.5 lib/logstash/outputs/websocket_topics/pubsub.rb