Sha256: 7a5163c8bebbcffa9ce66bb10ae22c3e03049c7821eb3e4b313c9e477242d366
Contents?: true
Size: 1.99 KB
Versions: 3
Compression:
Stored size: 1.99 KB
Contents
module GnipApi module Apis module PowerTrack class Stream attr_reader :adapter def initialize params = {} @stream = params[:stream] @source = params[:source] set_config end def logger GnipApi.logger end def consume request = create_request adapter.stream_get request do |chunk| @buffer.insert! chunk begin yield process_entries(@buffer.read!) rescue Exception => e puts e.class puts e.message puts e.backtrace[0..10].join("\n") raise e end end end def process_entries entries entries.map!{|e| parse_json(e)}.compact! entries.map!{|e| build_message(e)} log_system_messages(entries) entries end def log_system_messages entries entries.select{|message| message.system_message? }.each do |system_message| GnipApi.logger.send(system_message.log_method, system_message.message) end end def build_message params Gnip::Message.build(params) end def parse_json json begin JSON.parse json rescue JSON::ParserError nil end end private def create_request GnipApi::Request.new_get(endpoint, {'Accept-Encoding' => 'gzip'}) end def set_config raise 'MissingStream' if @stream.nil? raise 'MissingSource' if @source.nil? @user = GnipApi.configuration.user @password = GnipApi.configuration.password @account = GnipApi.configuration.account @adapter = GnipApi::Adapter.new @buffer = GnipApi::Apis::PowerTrack::Buffer.new end def endpoint GnipApi::Endpoints.powertrack_stream(@source, @stream) end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
gnip_api-0.0.4 | lib/gnip_api/apis/power_track/stream.rb |
gnip_api-0.0.3 | lib/gnip_api/apis/power_track/stream.rb |
gnip_api-0.0.2 | lib/gnip_api/apis/power_track/stream.rb |