# module to ingest data to a Wendelin through HTTP endpoint # Copyright (C) 2015 Nexedi SA and Contributors. # Kirill Smelkov # # This program is free software: you can Use, Study, Modify and Redistribute # it under the terms of the GNU General Public License version 3, or (at your # option) any later version, as published by the Free Software Foundation. # # You can also Link and Combine this program with other software covered by # the terms of any of the Free Software licenses or any of the Open Source # Initiative approved licenses and Convey the resulting work. Corresponding # source of such a combination shall include the source code for all other # software used. # # This program is distributed WITHOUT ANY WARRANTY; without even the implied # warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # # See COPYING file for full licensing terms. # See https://www.nexedi.com/licensing for rationale and options. require 'net/http' require 'openssl' # class representing a Wendelin client class WendelinClient # `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint" # `credentials` # {'user' => _, 'password' => _} TODO change to certificate # `log` - logger to use def initialize(streamtool_uri, credentials, log, ssl_timeout, open_timeout, read_timeout, keep_alive_timeout) @streamtool_uri = streamtool_uri @credentials = credentials @log = log @ssl_timeout = ssl_timeout @open_timeout = open_timeout @read_timeout = read_timeout @keep_alive_timeout = keep_alive_timeout end # start request in an independent function to keep the connection open def start_connection(uri) @log.debug "start new connection" @http = Net::HTTP.start(uri.hostname, uri.port, :use_ssl => (uri.scheme == 'https'), :verify_mode => OpenSSL::SSL::VERIFY_NONE, # Net::HTTP default open timeout is infinity, which results # in thread hang forever if other side does not fully # establish connection. Default read_timeout is 60 seconds. # We go safe way and make sure all timeouts are defined. :ssl_timeout => @ssl_timeout, :open_timeout => @open_timeout, :read_timeout => @read_timeout, :keep_alive_timeout => @keep_alive_timeout,) end # ingest `data_chunk` to a stream referenced as `reference` def ingest_with_keep_alive(reference, data_chunk) uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}") # call start_connection if http is undefined if ! defined? @http start_connection(uri) end # connect again if the connection is not started if ! @http.started?() start_connection(uri) end @request = Net::HTTP::Post.new(uri) # When using 'application/x-www-form-urlencoded', Ruby encodes with regex # and it is far too slow. Such POST is legit: # https://stackoverflow.com/a/14710450 @request.body = data_chunk @request.content_type = 'application/octet-stream' if @credentials.has_key?('user') @request.basic_auth @credentials['user'], @credentials['password'] end @log.on_trace do @log.trace '>>> REQUEST' @log.trace "method\t=> #{@request.method}" @log.trace "path\t=> #{@request.path}" @log.trace "uri\t=> #{@request.uri}" @log.trace "body\t=> #{@request.body}" @log.trace "body_stream\t=> #{@request.body_stream}" @request.each {|h| @log.trace "#{h}:\t#{@request[h]}"} @log.trace end begin res = @http.request(@request) # Net::HTTPResponse object end rescue # some http/ssl/other connection error @log.warn "HTTP ERROR:" raise else @log.on_trace do @log.trace '>>> RESPONSE' res.each {|h| @log.trace "#{h}:\t#{res[h]}"} @log.trace "code\t=> #{res.code}" @log.trace "msg\t=> #{res.message}" @log.trace "class\t=> #{res.class}" @log.trace "body:", res.body end if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX #@log.info "ingested ok" else @log.warn "FAIL:" res.value end end def ingest(reference, data_chunk) uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}") req = Net::HTTP::Post.new(uri) if @credentials.has_key?('user') req.basic_auth @credentials['user'], @credentials['password'] end # When using 'application/x-www-form-urlencoded', Ruby encodes with regex # and it is far too slow. Such POST is legit: # https://stackoverflow.com/a/14710450 req.body = data_chunk req.content_type = 'application/octet-stream' @log.on_trace do @log.trace '>>> REQUEST' @log.trace "method\t=> #{req.method}" @log.trace "path\t=> #{req.path}" @log.trace "uri\t=> #{req.uri}" @log.trace "body\t=> #{req.body}" @log.trace "body_stream\t=> #{req.body_stream}" req.each {|h| @log.trace "#{h}:\t#{req[h]}"} @log.trace end begin # TODO keep connection open (so that every new ingest does not do # full connect again) res = Net::HTTP.start(uri.hostname, uri.port, :use_ssl => (uri.scheme == 'https'), # NOTE = "do not check server cert" # TODO move this out to conf parameters :verify_mode => OpenSSL::SSL::VERIFY_NONE, # Net::HTTP default open timeout is infinity, which results # in thread hang forever if other side does not fully # establish connection. Default read_timeout is 60 seconds. # We go safe way and make sure all timeouts are defined. :ssl_timeout => @ssl_timeout, :open_timeout => @open_timeout, :read_timeout => @read_timeout, ) do |http| http.request(req) end rescue # some http/ssl/other connection error @log.warn "HTTP ERROR:" raise else @log.on_trace do @log.trace '>>> RESPONSE' res.each {|h| @log.trace "#{h}:\t#{res[h]}"} @log.trace "code\t=> #{res.code}" @log.trace "msg\t=> #{res.message}" @log.trace "class\t=> #{res.class}" @log.trace "body:", res.body end if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX #@log.info "ingested ok" else @log.warn "FAIL:" res.value end end end end