Sha256: 7a5163c8bebbcffa9ce66bb10ae22c3e03049c7821eb3e4b313c9e477242d366

Contents?: true

Size: 1.99 KB

Versions: 3

Compression:

Stored size: 1.99 KB

Contents

module GnipApi
  module Apis
    module PowerTrack
      class Stream
        attr_reader :adapter
        
        def initialize params = {}
          @stream = params[:stream]
          @source = params[:source]
          set_config
        end
        
        def logger
          GnipApi.logger
        end

        def consume
          request = create_request
          adapter.stream_get request do |chunk|
            @buffer.insert! chunk
            begin
              yield process_entries(@buffer.read!)
            rescue Exception => e
              puts e.class
              puts e.message
              puts e.backtrace[0..10].join("\n")
              raise e
            end
          end
        end 

        def process_entries entries
          entries.map!{|e| parse_json(e)}.compact!
          entries.map!{|e| build_message(e)}
          log_system_messages(entries)
          entries
        end

        def log_system_messages entries
          entries.select{|message| message.system_message? }.each do |system_message|
            GnipApi.logger.send(system_message.log_method, system_message.message)
          end
        end

        def build_message params
          Gnip::Message.build(params)
        end

        def parse_json json
          begin 
            JSON.parse json
          rescue JSON::ParserError
            nil
          end
        end

        private
        def create_request 
          GnipApi::Request.new_get(endpoint, {'Accept-Encoding' => 'gzip'})
        end

        def set_config
          raise 'MissingStream' if @stream.nil?
          raise 'MissingSource' if @source.nil?
          @user = GnipApi.configuration.user
          @password = GnipApi.configuration.password
          @account = GnipApi.configuration.account
          @adapter = GnipApi::Adapter.new
          @buffer = GnipApi::Apis::PowerTrack::Buffer.new
        end

        def endpoint
          GnipApi::Endpoints.powertrack_stream(@source, @stream)
        end

      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
gnip_api-0.0.4 lib/gnip_api/apis/power_track/stream.rb
gnip_api-0.0.3 lib/gnip_api/apis/power_track/stream.rb
gnip_api-0.0.2 lib/gnip_api/apis/power_track/stream.rb