Sha256: c71447f646a2d1ba538734db4f204b90583a4fb36451b58c9ea193e12fd65606

Contents?: true

Size: 1.2 KB

Versions: 1

Compression:

Stored size: 1.2 KB

Contents

require 'dripdrop/collector'
require 'em-synchrony'
require 'em-websocket'
require 'uri'
require 'json'

class DripDrop
  class PublisherCollector < Collector
    def websockets
      @websockets ||= []
    end
    
    def add_websocket(ws)
      websockets << ws
    end

    def rem_websocket(ws)
      websockets.delete(ws)
    end
    
    def on_recv(message)
      json = message.to_hash.to_json
      websockets.each {|ws| ws.send(json)}
    end
  end

  class Publisher
    attr_reader :sub_address, :sub_collector, :ws_address
    def initialize(sub_address='tcp://127.0.0.1:2901',ws_address='ws://127.0.0.1:2902')
      @sub_address   = URI.parse(sub_address)
      @ws_address    = URI.parse(ws_address)
      @sub_collector = PublisherCollector.new('tcp://127.0.0.1:2901')
    end

    def run
      @sub_collector.run
      EventMachine.synchrony do
        host, port =  @ws_address.host, @ws_address.port.to_i
        EventMachine::WebSocket.start(:host => host, :port => port, :debug => true) do |ws|
          ws.onopen do
            @sub_collector.add_websocket(ws)
          end
          ws.onclose do
            @sub_collector.rem_websocket(ws)
          end
        end
      end   
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
dripdrop-0.0.2 lib/dripdrop/publisher.rb