Sha256: 0a5565545a26b3cf1cd351514cff13afc29ad0d33c25424e7a1a31cb106c790c

Contents?: true

Size: 1.06 KB

Versions: 1

Compression:

Stored size: 1.06 KB

Contents

# encoding: utf-8
require "json"
require "logstash/namespace"
require "logstash/outputs/websocket_topics"
require "sinatra/base"
require "rack/handler/ftw" # from ftw
require "ftw/websocket/rack" # from ftw

class LogStash::Outputs::WebSocket::App < Sinatra::Base
  def initialize(channels, logger)
    @channels = channels
    @logger = logger
  end

  set :reload_templates, false

  get "/" do
    # TODO(sissel): Support filters/etc.
    ws = ::FTW::WebSocket::Rack.new(env)
    @logger.debug("New websocket client")
    stream(:keep_open) do |out|
      ws.each do |payload|
        json = JSON.parse(payload)
        if json['type'] == 'subscribe-topic'
          @channels[json['topic']].subscribe do |event| 
            ws.publish(event)
          end 
        elsif json['type'] == 'subscribe-all'
          @channels.values.each do |channel|
            channel.subscribe do |event|
              ws.publish(event)
            end 
          end
        end 
      end # pubsub
    end # stream

    ws.rack_response
  end # get /
end # class LogStash::Outputs::WebSocket::App

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
logstash-output-websocket_topics-2.1.10 lib/logstash/outputs/websocket_topics/app.rb