Sha256: d3d7b9c46d55f3ae22c7591a3460e97d1ce9459ea8feb28fdc1ee87918562c2f

Contents?: true

Size: 1.96 KB

Versions: 2

Compression:

Stored size: 1.96 KB

Contents

module GnipApi
  module Apis
    module PowerTrack
      class Stream
        attr_reader :adapter
        
        def initialize params = {}
          @stream = params[:stream] || GnipApi.config.label
          set_config
        end
        
        def logger
          GnipApi.logger
        end

        def consume
          request = create_request
          adapter.stream_get request do |chunk|
            @buffer.insert! chunk
            begin
              yield process_entries(@buffer.read!)
            rescue Exception => e
              puts e.class
              puts e.message
              puts e.backtrace[0..10].join("\n")
              raise e
            end
          end
        end 

        def process_entries entries
          entries.map!{|e| parse_json(e)}.compact!
          entries.map!{|e| build_message(e)}
          log_system_messages(entries)
          entries
        end

        def log_system_messages entries
          entries.select{|message| message.system_message? }.each do |system_message|
            GnipApi.logger.send(system_message.log_method, system_message.message)
          end
        end

        def build_message params
          Gnip::Message.build(params)
        end

        def parse_json json
          begin 
            GnipApi::JsonParser.new.parse json
          rescue GnipApi::Errors::JsonParser::ParseError
            nil
          end
        end

        private
        def create_request 
          GnipApi::Request.new_get(endpoint, {'Accept-Encoding' => 'gzip'})
        end

        def set_config
          raise 'MissingStream' if @stream.nil?
          @user = GnipApi.configuration.user
          @password = GnipApi.configuration.password
          @account = GnipApi.configuration.account
          @adapter = GnipApi::Adapter.new
          @buffer = GnipApi::Apis::PowerTrack::Buffer.new
        end

        def endpoint
          GnipApi::Endpoints.powertrack_stream(@stream)
        end

      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
gnip_api-1.0.1 lib/gnip_api/apis/power_track/stream.rb
gnip_api-1.0.0 lib/gnip_api/apis/power_track/stream.rb