lib/lumberjack/beats/client.rb in logstash-input-beats-2.1.4 vs lib/lumberjack/beats/client.rb in logstash-input-beats-2.2.0

- old
+ new

@@ -10,19 +10,26 @@ def initialize(opts={}) @opts = { :port => 0, :addresses => [], :ssl_certificate => nil, + :ssl_certificate_key => nil, + :ssl_certificate_authorities => nil, :ssl => true, :json => false, }.merge(opts) - @opts[:addresses] = [@opts[:addresses]] if @opts[:addresses].class == String + @opts[:addresses] = Array(@opts[:addresses]) raise "Must set a port." if @opts[:port] == 0 raise "Must set atleast one address" if @opts[:addresses].empty? == 0 - raise "Must set a ssl certificate or path" if @opts[:ssl_certificate].nil? && @opts[:ssl] + if @opts[:ssl] + if @opts[:ssl_certificate_authorities].nil? && (@opts[:ssl_certificate].nil? || @opts[:ssl_certificate_key].nil?) + raise "Must set a ssl certificate or path" + end + end + @socket = connect end private def connect @@ -65,40 +72,82 @@ @sequence = 0 @last_ack = 0 @opts = { :port => 0, :address => "127.0.0.1", + :ssl_certificate_authorities => [], # use the same naming as beats' TLS options :ssl_certificate => nil, + :ssl_certificate_key => nil, + :ssl_certificate_password => nil, :ssl => true, :json => false, }.merge(opts) @host = @opts[:address] - connection_start(opts) + connection_start end private - def connection_start(opts) - tcp_socket = TCPSocket.new(opts[:address], opts[:port]) - if !opts[:ssl] + def connection_start + tcp_socket = TCPSocket.new(@opts[:address], @opts[:port]) + + if !@opts[:ssl] @socket = tcp_socket else - certificate = OpenSSL::X509::Certificate.new(File.read(opts[:ssl_certificate])) - certificate_store = OpenSSL::X509::Store.new - certificate_store.add_cert(certificate) + @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, setup_ssl) + @socket.connect + end + end - ssl_context = OpenSSL::SSL::SSLContext.new - ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER - ssl_context.cert_store = certificate_store + private + def setup_ssl + ssl_context = OpenSSL::SSL::SSLContext.new - @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) - @socket.connect + ssl_context.cert = certificate + ssl_context.key = private_key + ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER + ssl_context.cert_store = trust_store + ssl_context + end + + private + def certificate + if @opts[:ssl_certificate] + OpenSSL::X509::Certificate.new(File.open(@opts[:ssl_certificate])) end end - private + private + def private_key + OpenSSL::PKey::RSA.new(File.read(@opts[:ssl_certificate_key]), @opts[:ssl_certificate_password]) if @opts[:ssl_certificate_key] + end + + private + def trust_store + store = OpenSSL::X509::Store.new + + Array(@opts[:ssl_certificate_authorities]).each do |certificate_authority| + if File.file?(certificate_authority) + store.add_file(certificate_authority) + else + # add_path is no implemented under jruby + # so recursively try to load all the certificate from this directory + # https://github.com/jruby/jruby-openssl/blob/master/src/main/java/org/jruby/ext/openssl/X509Store.java#L159 + if !!(RUBY_PLATFORM == "java") + Dir.glob(File.join(certificate_authority, "**", "*")).each { |f| store.add_file(f) } + else + store.add_path(certificate_authority) + end + end + end + + store + end + + + private def inc @sequence = 0 if @sequence + 1 > Lumberjack::Beats::SEQUENCE_MAX @sequence = @sequence + 1 end @@ -132,10 +181,10 @@ send_payload(compress) ack(elements.size) end - private + private def compress_payload(payload) compress = Zlib::Deflate.deflate(payload) ["1", "C", compress.bytesize, compress].pack("AANA*") end