lib/fluent/plugin/wendelin_client.rb in fluent-plugin-wendelin-0.3 vs lib/fluent/plugin/wendelin_client.rb in fluent-plugin-wendelin-0.4

- old
+ new

@@ -13,93 +13,176 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - require 'net/http' require 'openssl' - # class representing a Wendelin client class WendelinClient # `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint" # `credentials` # {'user' => _, 'password' => _} TODO change to certificate # `log` - logger to use - def initialize(streamtool_uri, credentials, log) - @streamtool_uri = streamtool_uri - @credentials = credentials - @log = log + def initialize(streamtool_uri, credentials, log, + ssl_timeout, open_timeout, read_timeout, keep_alive_timeout) + @streamtool_uri = streamtool_uri + @credentials = credentials + @log = log + @ssl_timeout = ssl_timeout + @open_timeout = open_timeout + @read_timeout = read_timeout + @keep_alive_timeout = keep_alive_timeout end + # start request in an independent function to keep the connection open + def start_connection(uri) + + @log.debug "start new connection" + + @http = Net::HTTP.start(uri.hostname, uri.port, + :use_ssl => (uri.scheme == 'https'), + :verify_mode => OpenSSL::SSL::VERIFY_NONE, + + # Net::HTTP default open timeout is infinity, which results + # in thread hang forever if other side does not fully + # establish connection. Default read_timeout is 60 seconds. + # We go safe way and make sure all timeouts are defined. + :ssl_timeout => @ssl_timeout, + :open_timeout => @open_timeout, + :read_timeout => @read_timeout, + :keep_alive_timeout => @keep_alive_timeout,) + end + # ingest `data_chunk` to a stream referenced as `reference` - def ingest(reference, data_chunk) - uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}") - req = Net::HTTP::Post.new(uri) - if @credentials.has_key?('user') - req.basic_auth @credentials['user'], @credentials['password'] - end + def ingest_with_keep_alive(reference, data_chunk) + uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}") + # call start_connection if http is undefined + if ! defined? @http + start_connection(uri) + end - # When using 'application/x-www-form-urlencoded', Ruby encodes with regex - # and it is far too slow. Such POST is legit: - # https://stackoverflow.com/a/14710450 - req.body = data_chunk - req.content_type = 'application/octet-stream' + # connect again if the connection is not started + if ! @http.started?() + start_connection(uri) + end - @log.on_trace do - @log.trace '>>> REQUEST' - @log.trace "method\t=> #{req.method}" - @log.trace "path\t=> #{req.path}" - @log.trace "uri\t=> #{req.uri}" - @log.trace "body\t=> #{req.body}" - @log.trace "body_stream\t=> #{req.body_stream}" - req.each {|h| @log.trace "#{h}:\t#{req[h]}"} - @log.trace - end + @request = Net::HTTP::Post.new(uri) - begin - # TODO keep connection open (so that every new ingest does not do - # full connect again) - res = Net::HTTP.start(uri.hostname, uri.port, - :use_ssl => (uri.scheme == 'https'), - # NOTE = "do not check server cert" - # TODO move this out to conf parameters - :verify_mode => OpenSSL::SSL::VERIFY_NONE, + # When using 'application/x-www-form-urlencoded', Ruby encodes with regex + # and it is far too slow. Such POST is legit: + # https://stackoverflow.com/a/14710450 + @request.body = data_chunk + @request.content_type = 'application/octet-stream' - # Net::HTTP default open timeout is infinity, which results - # in thread hang forever if other side does not fully - # establish connection. Default read_timeout is 60 seconds. - # We go safe way and make sure all timeouts are defined. - :ssl_timeout => 60, - :open_timeout => 60, - :read_timeout => 60, - ) do |http| - http.request(req) - end + if @credentials.has_key?('user') + @request.basic_auth @credentials['user'], @credentials['password'] + end - rescue - # some http/ssl/other connection error - @log.warn "HTTP ERROR:" - raise + @log.on_trace do + @log.trace '>>> REQUEST' + @log.trace "method\t=> #{@request.method}" + @log.trace "path\t=> #{@request.path}" + @log.trace "uri\t=> #{@request.uri}" + @log.trace "body\t=> #{@request.body}" + @log.trace "body_stream\t=> #{@request.body_stream}" + @request.each {|h| @log.trace "#{h}:\t#{@request[h]}"} + @log.trace + end - else - @log.on_trace do - @log.trace '>>> RESPONSE' - res.each {|h| @log.trace "#{h}:\t#{res[h]}"} - @log.trace "code\t=> #{res.code}" - @log.trace "msg\t=> #{res.message}" - @log.trace "class\t=> #{res.class}" - @log.trace "body:", res.body - end + begin + res = @http.request(@request) # Net::HTTPResponse object + end - if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX - #@log.info "ingested ok" - else - @log.warn "FAIL:" - res.value - end - end + rescue + # some http/ssl/other connection error + @log.warn "HTTP ERROR:" + raise + else + @log.on_trace do + @log.trace '>>> RESPONSE' + res.each {|h| @log.trace "#{h}:\t#{res[h]}"} + @log.trace "code\t=> #{res.code}" + @log.trace "msg\t=> #{res.message}" + @log.trace "class\t=> #{res.class}" + @log.trace "body:", res.body + end + + if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX + #@log.info "ingested ok" + else + @log.warn "FAIL:" + res.value + end end + + def ingest(reference, data_chunk) + uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}") + req = Net::HTTP::Post.new(uri) + if @credentials.has_key?('user') + req.basic_auth @credentials['user'], @credentials['password'] + end + + # When using 'application/x-www-form-urlencoded', Ruby encodes with regex + # and it is far too slow. Such POST is legit: + # https://stackoverflow.com/a/14710450 + req.body = data_chunk + req.content_type = 'application/octet-stream' + + @log.on_trace do + @log.trace '>>> REQUEST' + @log.trace "method\t=> #{req.method}" + @log.trace "path\t=> #{req.path}" + @log.trace "uri\t=> #{req.uri}" + @log.trace "body\t=> #{req.body}" + @log.trace "body_stream\t=> #{req.body_stream}" + req.each {|h| @log.trace "#{h}:\t#{req[h]}"} + @log.trace + end + + begin + # TODO keep connection open (so that every new ingest does not do + # full connect again) + res = Net::HTTP.start(uri.hostname, uri.port, + :use_ssl => (uri.scheme == 'https'), + # NOTE = "do not check server cert" + # TODO move this out to conf parameters + :verify_mode => OpenSSL::SSL::VERIFY_NONE, + + # Net::HTTP default open timeout is infinity, which results + # in thread hang forever if other side does not fully + # establish connection. Default read_timeout is 60 seconds. + # We go safe way and make sure all timeouts are defined. + :ssl_timeout => @ssl_timeout, + :open_timeout => @open_timeout, + :read_timeout => @read_timeout, + ) do |http| + http.request(req) + end + + rescue + # some http/ssl/other connection error + @log.warn "HTTP ERROR:" + raise + + else + @log.on_trace do + @log.trace '>>> RESPONSE' + res.each {|h| @log.trace "#{h}:\t#{res[h]}"} + @log.trace "code\t=> #{res.code}" + @log.trace "msg\t=> #{res.message}" + @log.trace "class\t=> #{res.class}" + @log.trace "body:", res.body + end + + if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX + #@log.info "ingested ok" + else + @log.warn "FAIL:" + res.value + end + end + end end