class GnipApi::PowerTrack::Stream

  1. lib/gnip_api/power_track/stream.rb
Superclass: Object

Handles a stream connection to PowerTrack to receive the data.

There are 3 ways to connect and consume the connection provided:

  • :common

  • :io

  • :pty

Each method uses a different backend. This is a result of experimentation to mitigate disconnect issues. Each method handles differently the keep-alive signals and works a bit differently at the low level. The recommended method is :common, and will in the future become the default once it's polished enough.

In addition to the methods above, a third strategy using the :common method is also offered to detach any processing you do on your end using threads.

Public Class methods

new ()
[show source]
# File lib/gnip_api/power_track/stream.rb, line 19
def initialize
  @user = GnipApi.configuration.user
  @password = GnipApi.configuration.password
  @account = GnipApi.configuration.account
  @adapter = GnipApi::Adapter.new
  @buffer = GnipApi::PowerTrack::Buffer.new
  @running = false
end

Public Instance methods

build_message (params)

Builds a Gnip::Message object from the item params received.

[show source]
# File lib/gnip_api/power_track/stream.rb, line 192
def build_message params
  Gnip::Message.build(params)
end
consume (stream_method=:common)

The following methods are different ways of consuming the stream There are 3 different methods that return data slighly different. :common method uses a simple HTTParty request reading chunks and decoding the GZip. This method has a flaw that it waits for certain data to be buffered by Zlib in order to return a decoded chunk. :common will return chunks that may contain more than 1 objects.

:io method uses curl under the hood, in combination with IO.popen to captrue stdout. For this method a single line is returned, which would be an object sent to stream. Curl handles the GZip decoding better, however the read method for the IO buffers up the keep alive signals due to not flushing STDOUT.

:pty method is an alternative for :io in where the stdout output is captured as it comes using PTY features. It almost works the same as :io, but the keep alive signals are now captured properly.

[show source]
# File lib/gnip_api/power_track/stream.rb, line 73
def consume stream_method=:common
  raise ArgumentError, "Block required, non given" unless block_given?
  if stream_method == :common
    read_stream do |data|
      yield(process_entries(data))
    end
  elsif stream_method == :io
    read_io_stream do |data|
      yield(process_entries([data]))
    end
  elsif stream_method == :pty
    read_pty_stream do |data|
      yield(process_entries([data]))
    end
  else 
    raise ArgumentError, "Undefined stream method #{stream_method}"
  end
end
consume_json (stream_method=:common)

Similar to consume but parses the JSON to Hash with no further processing. stream_method param accepts the same options as consume.

[show source]
# File lib/gnip_api/power_track/stream.rb, line 117
def consume_json stream_method=:common
  raise ArgumentError, "Block required, non given" unless block_given?
  if stream_method == :common
    read_stream do |data|
      yield(data.map{|item| parse_json(item)})
    end
  elsif stream_method == :io
    read_io_stream do |data|
      yield(parse_json(data))
    end
  elsif stream_method == :pty
    read_pty_stream do |data|
      yield(parse_json(data))
    end
  else
    raise ArgumentError, "Undefined stream method #{stream_method}"
  end
end
consume_raw (stream_method=:common)

Similar to consume with the difference this one spits out raw JSON and has no parsing on the data received. Use it for a faster consumtion. stream_method param accepts the same options as consume.

[show source]
# File lib/gnip_api/power_track/stream.rb, line 95
def consume_raw stream_method=:common
  raise ArgumentError, "Block required, non given" unless block_given?
  if stream_method == :common
    read_stream do |data|
      yield(data)
    end
  elsif stream_method == :io
    read_io_stream do |data|
      yield(data)
    end
  elsif stream_method == :pty
    read_pty_stream do |data|
      yield(data)
    end
  else 
    raise ArgumentError, "Undefined stream method #{stream_method}"
  end
end
logger ()

Returns the configured logger.

[show source]
# File lib/gnip_api/power_track/stream.rb, line 29
def logger
  GnipApi.logger
end
parse_json (json)

Returns a Hash from a parsed JSON string.

[show source]
# File lib/gnip_api/power_track/stream.rb, line 197
def parse_json json
  begin 
    GnipApi::JsonParser.new.parse json
  rescue GnipApi::Errors::JsonParser::ParseError
    nil
  end
end
process_entries (entries)

Processes the items received after splitting them up, returning appropiate Gnip objects.

[show source]
# File lib/gnip_api/power_track/stream.rb, line 183
def process_entries entries
  logger.debug "PowerTrack Stream: #{entries.size} items received"
  data = entries.map{|e| parse_json(e)}.compact
  data.map!{|e| build_message(e)} 
  data.select(&:system_message?).each(&:log!)
  return data
end
read_io_stream ()

Opens the connection to the PowerTrack stream and returns any data received using CURL IO transfer method.

[show source]
# File lib/gnip_api/power_track/stream.rb, line 138
def read_io_stream
  request = create_request
  logger.info "Opening PowerTrack parsed stream"
  begin
    @adapter.io_curl_stream(request) do |data|
      yield data
    end
  ensure
    logger.warn "Closing stream"
  end
end
read_pty_stream ()

Opens the connection to the PowerTrack stream and returns any data received using CURL PTY transfer method.

[show source]
# File lib/gnip_api/power_track/stream.rb, line 152
def read_pty_stream
  request = create_request
  logger.info "Opening PowerTrack parsed stream"
  begin
    @adapter.pty_curl_stream(request) do |data|
      yield data
    end
  ensure
    logger.warn "Closing stream"
  end
end
read_stream ()

Opens the connection to the PowerTrack stream and returns any data received using HTTParty and standard net/http. The buffer is used in this case to collect the chunks and later split them into items.

[show source]
# File lib/gnip_api/power_track/stream.rb, line 167
def read_stream
  request = create_request
  logger.info "Opening PowerTrack parsed stream"
  begin
    @adapter.stream_get request do |chunk|
      @buffer.insert! chunk
      yield @buffer.read! if block_given?
    end
  ensure
    logger.warn "Closing stream"
    @running = false
  end
end
thread_consume ()

Consumes the stream using a streamer thread instead of a simple block. This way the streamer can fill in the buffer and the block consumes it periodically.

[show source]
# File lib/gnip_api/power_track/stream.rb, line 35
def thread_consume
  streamer = Thread.new do
    logger.info "Starting streamer Thread"
    begin
      read_stream
    ensure
      logger.warn "Streamer exited"
    end
  end

  begin
    loop do
      logger.warn "Streamer is down" unless streamer.alive?
      raise GnipApi::Errors::PowerTrack::StreamDown unless streamer.alive?
      entries = @buffer.read!
      entries.any? ? yield(process_entries(entries)) : sleep(0.1)
    end
  ensure
    streamer.kill if streamer.alive?
  end
end