Sha256: a7e2e6bf3cff4028e9d8b1f4dea4669b4a2681343685e18bf3d15c13ce24fc6b

Contents?: true

Size: 1.54 KB

Versions: 2

Compression:

Stored size: 1.54 KB

Contents

module DataSift
  class LiveStream < DataSift::ApiResource

    @stream              = nil
    @on_datasift_message = lambda {}

    def initialize (config, stream)
      @config        = config
      @stream        = stream
      @retry_timeout = 0
      @subscriptions = {}
      @connected     = false
    end

    attr_reader :connected, :stream, :retry_timeout, :subscriptions
    attr_writer :connected, :retry_timeout, :on_datasift_message

    def connected?
      @connected
    end

    def fire_ds_message(message)
      hash = false
      if message.has_key?(:hash)
        hash = message[:hash]
      end
      message.merge!({
                         :is_failure => message[:status] == 'failure',
                         :is_success => message[:status] == 'success',
                         :is_warning => message[:status] == 'warning',
                         :is_tick    => message[:status] == 'connected'
                     })
      @on_datasift_message.call(self, message, hash)
    end

    def fire_on_message(hash, interaction)
      callback = @subscriptions[hash]
      if callback == nil
        raise StreamingMessageError.new "no valid on_message callback provided for stream #{hash} with message #{interaction}"
      end
      callback.call(interaction, self, hash)
    end

    def subscribe(hash, on_message)
      @subscriptions[hash] = on_message
      @stream.send "{ \"action\":\"subscribe\",\"hash\":\"#{hash}\"}"
    end

    def unsubscribe hash
      @stream.send "{ \"action\":\"unsubscribe\",\"hash\":\"#{hash}\"}"
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
datasift-3.0.0.beta2 lib/live_stream.rb
datasift-3.0.0.beta lib/live_stream.rb