Sha256: e414424e70cbed73b36368c4e2e559e074995832deba1df2d80fd3b3958bc985

Contents?: true

Size: 1.04 KB

Versions: 21

Compression:

Stored size: 1.04 KB

Contents

# encoding: utf-8
require "logstash/namespace"
require "logstash/outputs/websocket"

class LogStash::Outputs::WebSocket::Pubsub
  attr_accessor :logger

  def initialize
    @subscribers = []
    @subscribers_lock = Mutex.new
  end # def initialize

  def publish(object)
    @subscribers_lock.synchronize do
      break if @subscribers.size == 0

      failed = []
      @subscribers.each do |subscriber|
        begin
          subscriber.call(object)
        rescue => e
          @logger.error("Failed to publish to subscriber", :subscriber => subscriber, :exception => e)
          failed << subscriber
        end
      end

      failed.each do |subscriber|
        @subscribers.delete(subscriber)
      end
    end # @subscribers_lock.synchronize
  end # def Pubsub

  def subscribe(&block)
    queue = Queue.new
    @subscribers_lock.synchronize do
      @subscribers << proc do |event|
        queue << event
      end
    end

    while true
      block.call(queue.pop)
    end
  end # def subscribe
end # class LogStash::Outputs::WebSocket::Pubsub

Version data entries

21 entries across 21 versions & 4 rubygems

Version Path
logstash-output-websocket-3.1.0 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-3.0.5 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-3.0.4 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-3.0.3 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-3.0.2 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-3.0.1 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-3.0.0 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-2.0.4 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket_topics-2.1.8 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket_topics-2.1.4 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-topics-2.1.3 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-topics-2.1.2 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-2.0.2 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-2.0.1 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-0.1.5 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-0.1.4 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-0.1.3 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-0.1.2 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-0.1.1 lib/logstash/outputs/websocket/pubsub.rb
logstash-output-websocket-0.1.0 lib/logstash/outputs/websocket/pubsub.rb