Sha256: d3d7b9c46d55f3ae22c7591a3460e97d1ce9459ea8feb28fdc1ee87918562c2f
Contents?: true
Size: 1.96 KB
Versions: 2
Compression:
Stored size: 1.96 KB
Contents
module GnipApi module Apis module PowerTrack class Stream attr_reader :adapter def initialize params = {} @stream = params[:stream] || GnipApi.config.label set_config end def logger GnipApi.logger end def consume request = create_request adapter.stream_get request do |chunk| @buffer.insert! chunk begin yield process_entries(@buffer.read!) rescue Exception => e puts e.class puts e.message puts e.backtrace[0..10].join("\n") raise e end end end def process_entries entries entries.map!{|e| parse_json(e)}.compact! entries.map!{|e| build_message(e)} log_system_messages(entries) entries end def log_system_messages entries entries.select{|message| message.system_message? }.each do |system_message| GnipApi.logger.send(system_message.log_method, system_message.message) end end def build_message params Gnip::Message.build(params) end def parse_json json begin GnipApi::JsonParser.new.parse json rescue GnipApi::Errors::JsonParser::ParseError nil end end private def create_request GnipApi::Request.new_get(endpoint, {'Accept-Encoding' => 'gzip'}) end def set_config raise 'MissingStream' if @stream.nil? @user = GnipApi.configuration.user @password = GnipApi.configuration.password @account = GnipApi.configuration.account @adapter = GnipApi::Adapter.new @buffer = GnipApi::Apis::PowerTrack::Buffer.new end def endpoint GnipApi::Endpoints.powertrack_stream(@stream) end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
gnip_api-1.0.1 | lib/gnip_api/apis/power_track/stream.rb |
gnip_api-1.0.0 | lib/gnip_api/apis/power_track/stream.rb |