lib/fluent/plugin/wendelin_client.rb in fluent-plugin-wendelin-0.3 vs lib/fluent/plugin/wendelin_client.rb in fluent-plugin-wendelin-0.4
- old
+ new
@@ -13,93 +13,176 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
require 'net/http'
require 'openssl'
-
# class representing a Wendelin client
class WendelinClient
# `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint"
# `credentials` # {'user' => _, 'password' => _} TODO change to certificate
# `log` - logger to use
- def initialize(streamtool_uri, credentials, log)
- @streamtool_uri = streamtool_uri
- @credentials = credentials
- @log = log
+ def initialize(streamtool_uri, credentials, log,
+ ssl_timeout, open_timeout, read_timeout, keep_alive_timeout)
+ @streamtool_uri = streamtool_uri
+ @credentials = credentials
+ @log = log
+ @ssl_timeout = ssl_timeout
+ @open_timeout = open_timeout
+ @read_timeout = read_timeout
+ @keep_alive_timeout = keep_alive_timeout
end
+ # start request in an independent function to keep the connection open
+ def start_connection(uri)
+
+ @log.debug "start new connection"
+
+ @http = Net::HTTP.start(uri.hostname, uri.port,
+ :use_ssl => (uri.scheme == 'https'),
+ :verify_mode => OpenSSL::SSL::VERIFY_NONE,
+
+ # Net::HTTP default open timeout is infinity, which results
+ # in thread hang forever if other side does not fully
+ # establish connection. Default read_timeout is 60 seconds.
+ # We go safe way and make sure all timeouts are defined.
+ :ssl_timeout => @ssl_timeout,
+ :open_timeout => @open_timeout,
+ :read_timeout => @read_timeout,
+ :keep_alive_timeout => @keep_alive_timeout,)
+ end
+
# ingest `data_chunk` to a stream referenced as `reference`
- def ingest(reference, data_chunk)
- uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
- req = Net::HTTP::Post.new(uri)
- if @credentials.has_key?('user')
- req.basic_auth @credentials['user'], @credentials['password']
- end
+ def ingest_with_keep_alive(reference, data_chunk)
+ uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
+ # call start_connection if http is undefined
+ if ! defined? @http
+ start_connection(uri)
+ end
- # When using 'application/x-www-form-urlencoded', Ruby encodes with regex
- # and it is far too slow. Such POST is legit:
- # https://stackoverflow.com/a/14710450
- req.body = data_chunk
- req.content_type = 'application/octet-stream'
+ # connect again if the connection is not started
+ if ! @http.started?()
+ start_connection(uri)
+ end
- @log.on_trace do
- @log.trace '>>> REQUEST'
- @log.trace "method\t=> #{req.method}"
- @log.trace "path\t=> #{req.path}"
- @log.trace "uri\t=> #{req.uri}"
- @log.trace "body\t=> #{req.body}"
- @log.trace "body_stream\t=> #{req.body_stream}"
- req.each {|h| @log.trace "#{h}:\t#{req[h]}"}
- @log.trace
- end
+ @request = Net::HTTP::Post.new(uri)
- begin
- # TODO keep connection open (so that every new ingest does not do
- # full connect again)
- res = Net::HTTP.start(uri.hostname, uri.port,
- :use_ssl => (uri.scheme == 'https'),
- # NOTE = "do not check server cert"
- # TODO move this out to conf parameters
- :verify_mode => OpenSSL::SSL::VERIFY_NONE,
+ # When using 'application/x-www-form-urlencoded', Ruby encodes with regex
+ # and it is far too slow. Such POST is legit:
+ # https://stackoverflow.com/a/14710450
+ @request.body = data_chunk
+ @request.content_type = 'application/octet-stream'
- # Net::HTTP default open timeout is infinity, which results
- # in thread hang forever if other side does not fully
- # establish connection. Default read_timeout is 60 seconds.
- # We go safe way and make sure all timeouts are defined.
- :ssl_timeout => 60,
- :open_timeout => 60,
- :read_timeout => 60,
- ) do |http|
- http.request(req)
- end
+ if @credentials.has_key?('user')
+ @request.basic_auth @credentials['user'], @credentials['password']
+ end
- rescue
- # some http/ssl/other connection error
- @log.warn "HTTP ERROR:"
- raise
+ @log.on_trace do
+ @log.trace '>>> REQUEST'
+ @log.trace "method\t=> #{@request.method}"
+ @log.trace "path\t=> #{@request.path}"
+ @log.trace "uri\t=> #{@request.uri}"
+ @log.trace "body\t=> #{@request.body}"
+ @log.trace "body_stream\t=> #{@request.body_stream}"
+ @request.each {|h| @log.trace "#{h}:\t#{@request[h]}"}
+ @log.trace
+ end
- else
- @log.on_trace do
- @log.trace '>>> RESPONSE'
- res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
- @log.trace "code\t=> #{res.code}"
- @log.trace "msg\t=> #{res.message}"
- @log.trace "class\t=> #{res.class}"
- @log.trace "body:", res.body
- end
+ begin
+ res = @http.request(@request) # Net::HTTPResponse object
+ end
- if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
- #@log.info "ingested ok"
- else
- @log.warn "FAIL:"
- res.value
- end
- end
+ rescue
+ # some http/ssl/other connection error
+ @log.warn "HTTP ERROR:"
+ raise
+ else
+ @log.on_trace do
+ @log.trace '>>> RESPONSE'
+ res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
+ @log.trace "code\t=> #{res.code}"
+ @log.trace "msg\t=> #{res.message}"
+ @log.trace "class\t=> #{res.class}"
+ @log.trace "body:", res.body
+ end
+
+ if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
+ #@log.info "ingested ok"
+ else
+ @log.warn "FAIL:"
+ res.value
+ end
end
+
+ def ingest(reference, data_chunk)
+ uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
+ req = Net::HTTP::Post.new(uri)
+ if @credentials.has_key?('user')
+ req.basic_auth @credentials['user'], @credentials['password']
+ end
+
+ # When using 'application/x-www-form-urlencoded', Ruby encodes with regex
+ # and it is far too slow. Such POST is legit:
+ # https://stackoverflow.com/a/14710450
+ req.body = data_chunk
+ req.content_type = 'application/octet-stream'
+
+ @log.on_trace do
+ @log.trace '>>> REQUEST'
+ @log.trace "method\t=> #{req.method}"
+ @log.trace "path\t=> #{req.path}"
+ @log.trace "uri\t=> #{req.uri}"
+ @log.trace "body\t=> #{req.body}"
+ @log.trace "body_stream\t=> #{req.body_stream}"
+ req.each {|h| @log.trace "#{h}:\t#{req[h]}"}
+ @log.trace
+ end
+
+ begin
+ # TODO keep connection open (so that every new ingest does not do
+ # full connect again)
+ res = Net::HTTP.start(uri.hostname, uri.port,
+ :use_ssl => (uri.scheme == 'https'),
+ # NOTE = "do not check server cert"
+ # TODO move this out to conf parameters
+ :verify_mode => OpenSSL::SSL::VERIFY_NONE,
+
+ # Net::HTTP default open timeout is infinity, which results
+ # in thread hang forever if other side does not fully
+ # establish connection. Default read_timeout is 60 seconds.
+ # We go safe way and make sure all timeouts are defined.
+ :ssl_timeout => @ssl_timeout,
+ :open_timeout => @open_timeout,
+ :read_timeout => @read_timeout,
+ ) do |http|
+ http.request(req)
+ end
+
+ rescue
+ # some http/ssl/other connection error
+ @log.warn "HTTP ERROR:"
+ raise
+
+ else
+ @log.on_trace do
+ @log.trace '>>> RESPONSE'
+ res.each {|h| @log.trace "#{h}:\t#{res[h]}"}
+ @log.trace "code\t=> #{res.code}"
+ @log.trace "msg\t=> #{res.message}"
+ @log.trace "class\t=> #{res.class}"
+ @log.trace "body:", res.body
+ end
+
+ if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX
+ #@log.info "ingested ok"
+ else
+ @log.warn "FAIL:"
+ res.value
+ end
+ end
+ end
end