#
# stream_consumer_http.rb - This file contains the StreamConsumer_HTTP class.
#
# Copyright (C) 2011 MediaSift Ltd
#
# == Overview
#
# The StreamConsumer_HTTP class implements HTTP streaming.

$LOAD_PATH.unshift(File.dirname(__FILE__) + '/../')

require 'uri'
require 'socket'
require 'yajl'
require 'cgi'

module DataSift

	class StreamConsumer_HTTP < StreamConsumer

		# Constructor. Requires valid user and definition objects.
		def initialize(user, definition)
			super
		end

		def onStart(&block)
			begin
				reconnect() unless !@socket.nil? and !@socket.closed?

				parser = Yajl::Parser.new
				parser.on_parse_complete = block if block_given?
				if @response_head[:headers]["Transfer-Encoding"] == 'chunked'
					if block_given?
						chunkLeft = 0
						while !@socket.eof? && (line = @socket.gets) && @state == StreamConsumer::STATE_RUNNING
							break if line.match /^0.*?\r\n/
							next if line == "\r\n"
							size = line.hex
							json = @socket.read(size)
							next if json.nil?
							chunkLeft = size-json.size
							if chunkLeft == 0
								if json.length > 100
									parser << json
								end
							else
								# received only part of the chunk, grab the rest
								parser << @socket.read(chunkLeft)
							end
						end
					else
						raise StreamError, 'Chunked responses detected, but no block given to handle the chunks.'
					end
				else
					content_type = @response_head[:headers]['Content-Type'].split(';')
					content_type = content_type.first
					if ALLOWED_MIME_TYPES.include?(content_type)
						case @response_head[:headers]['Content-Encoding']
						when 'gzip'
							return Yajl::Gzip::StreamReader.parse(@socket, opts, &block)
						when 'deflate'
							return Yajl::Deflate::StreamReader.parse(@socket, opts.merge({:deflate_options => -Zlib::MAX_WBITS}), &block)
						when 'bzip2'
							return Yajl::Bzip2::StreamReader.parse(@socket, opts, &block)
						else
							return parser.parse(@socket)
						end
					else
						raise StreamError, 'Unhandled response MIME type ' + content_type
					end
				end
			end while @auto_reconnect and @state == StreamConsumer::STATE_RUNNING

			disconnect()

			if @state == StreamConsumer::STATE_STOPPING
				@stop_reason = 'Stop requested'
			else
				@stop_reason = 'Connection dropped'
			end

			onStop(@stop_reason)
		end

		def reconnect()
			uri = URI.parse('http://' + User::STREAM_BASE_URL + @definition.hash +
											'?username=' + CGI.escape(@user.username) + '&api_key=' + CGI.escape(@user.api_key))

			user_agent = @user.getUserAgent()

			request = "GET #{uri.path}#{uri.query ? "?"+uri.query : nil} HTTP/1.1\r\n"
			request << "Host: #{uri.host}\r\n"
			request << "User-Agent: #{user_agent}\r\n"
			request << "Accept: */*\r\n"
			request << "\r\n"

			connection_delay = 0

			begin
				# Close the socket if it's open
				disconnect()

				# Back off a bit if required
				sleep(connection_delay) if connection_delay > 0

				begin
					@socket = TCPSocket.new(uri.host, uri.port)

					@socket.write(request)
					@response_head = {}
					@response_head[:headers] = {}

					# Read the headers
					@socket.each_line do |line|
						if line == "\r\n" # end of the headers
							break
						else
							header = line.split(": ")
							if header.size == 1
								header = header[0].split(" ")
								@response_head[:version] = header[0]
								@response_head[:code] = header[1].to_i
								@response_head[:msg] = header[2]
							else
								@response_head[:headers][header[0]] = header[1].strip
							end
						end
					end

					if @response_head[:code] == 200
						# Success!
						@state = StreamConsumer::STATE_RUNNING
					elsif @response_head[:code] == 404
						raise StreamError, 'Hash not found!'
					else
						puts 'Connection failed: ' + @response_head[:code] + ' ' + @response_head[:msg]
						if connection_delay == 0
							connection_delay = 10;
						elsif connection_delay < 240
							connection_delay *= 2;
						else
							raise StreamError, 'Connection failed: ' + @response_head[:code] + ' ' + @response_head[:msg]
						end
					end
				#rescue
				#	if connection_delay == 0
				#		connection_delay = 1
				#	elsif connection_delay <= 16
				#		connection_delay += 1
				#	else
				#		raise StreamError, 'Connection failed due to a network error'
				#	end
				end
			end while @state != StreamConsumer::STATE_RUNNING
		end

		def disconnect()
			@socket.close if !@socket.nil? and !@socket.closed?
		end

	end

end