Sha256: a4ad7b449d4e4d66e5aa5c8d05d4bcd711ec7a80c28cbd8293dee8c8a56674ee

Contents?: true

Size: 1.71 KB

Versions: 1

Compression:

Stored size: 1.71 KB

Contents

module Gnip
  module GnipStream
    class Replay < Stream
      def initialize(client)
        super # version is setted in the super
        case version
        when '1.0'
          @url = "https://stream.gnip.com:443/accounts/#{client.account}/publishers/#{client.publisher}/replay/track/#{client.replay_label}.json"
        when '2.0'
          @url = "https://gnip-stream.gnip.com/replay/powertrack/accounts/#{client.account}/publishers/#{client.publisher}/#{client.replay_label}.json"
        else
          raise Exception.new("version #{version} is not supported from this gem.")
        end
      end

      def configure_handlers
        on_error { |error| @error_handler.attempt_to_reconnect("Gnip Connection Error. Reason was: #{error.inspect}") }
        on_connection_close { puts 'Gnip::GnipStream::Replay -> Connection closed' }
      end

      def consume(options = {}, &block)
        @client_callback = block if block
        on_message(&@client_callback)
        connect(options)
      end

      def connect(options)
        search_options = {}
        search_options[:fromDate]    = Gnip.format_date(options[:date_from])  if options[:date_from]
        search_options[:toDate]      = Gnip.format_date(options[:date_to])    if options[:date_to]
        stream_url = [url, search_options.to_query].join('?')
        EM.run do
          http = EM::HttpRequest.new(stream_url, inactivity_timeout: 45, connection_timeout: 75).get(head: @headers)
          http.stream { |chunk| process_chunk(chunk) }
          http.callback {
            handle_connection_close(http)
            EM.stop
          }
          http.errback {
            handle_error(http)
            EM.stop
          }
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
gnip-client-0.2.12 lib/gnip/gnip-stream/replay.rb