lib/lumberjack/beats/client.rb in logstash-input-beats-2.1.4 vs lib/lumberjack/beats/client.rb in logstash-input-beats-2.2.0
- old
+ new
@@ -10,19 +10,26 @@
def initialize(opts={})
@opts = {
:port => 0,
:addresses => [],
:ssl_certificate => nil,
+ :ssl_certificate_key => nil,
+ :ssl_certificate_authorities => nil,
:ssl => true,
:json => false,
}.merge(opts)
- @opts[:addresses] = [@opts[:addresses]] if @opts[:addresses].class == String
+ @opts[:addresses] = Array(@opts[:addresses])
raise "Must set a port." if @opts[:port] == 0
raise "Must set atleast one address" if @opts[:addresses].empty? == 0
- raise "Must set a ssl certificate or path" if @opts[:ssl_certificate].nil? && @opts[:ssl]
+ if @opts[:ssl]
+ if @opts[:ssl_certificate_authorities].nil? && (@opts[:ssl_certificate].nil? || @opts[:ssl_certificate_key].nil?)
+ raise "Must set a ssl certificate or path"
+ end
+ end
+
@socket = connect
end
private
def connect
@@ -65,40 +72,82 @@
@sequence = 0
@last_ack = 0
@opts = {
:port => 0,
:address => "127.0.0.1",
+ :ssl_certificate_authorities => [], # use the same naming as beats' TLS options
:ssl_certificate => nil,
+ :ssl_certificate_key => nil,
+ :ssl_certificate_password => nil,
:ssl => true,
:json => false,
}.merge(opts)
@host = @opts[:address]
- connection_start(opts)
+ connection_start
end
private
- def connection_start(opts)
- tcp_socket = TCPSocket.new(opts[:address], opts[:port])
- if !opts[:ssl]
+ def connection_start
+ tcp_socket = TCPSocket.new(@opts[:address], @opts[:port])
+
+ if !@opts[:ssl]
@socket = tcp_socket
else
- certificate = OpenSSL::X509::Certificate.new(File.read(opts[:ssl_certificate]))
- certificate_store = OpenSSL::X509::Store.new
- certificate_store.add_cert(certificate)
+ @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, setup_ssl)
+ @socket.connect
+ end
+ end
- ssl_context = OpenSSL::SSL::SSLContext.new
- ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
- ssl_context.cert_store = certificate_store
+ private
+ def setup_ssl
+ ssl_context = OpenSSL::SSL::SSLContext.new
- @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
- @socket.connect
+ ssl_context.cert = certificate
+ ssl_context.key = private_key
+ ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
+ ssl_context.cert_store = trust_store
+ ssl_context
+ end
+
+ private
+ def certificate
+ if @opts[:ssl_certificate]
+ OpenSSL::X509::Certificate.new(File.open(@opts[:ssl_certificate]))
end
end
- private
+ private
+ def private_key
+ OpenSSL::PKey::RSA.new(File.read(@opts[:ssl_certificate_key]), @opts[:ssl_certificate_password]) if @opts[:ssl_certificate_key]
+ end
+
+ private
+ def trust_store
+ store = OpenSSL::X509::Store.new
+
+ Array(@opts[:ssl_certificate_authorities]).each do |certificate_authority|
+ if File.file?(certificate_authority)
+ store.add_file(certificate_authority)
+ else
+ # add_path is no implemented under jruby
+ # so recursively try to load all the certificate from this directory
+ # https://github.com/jruby/jruby-openssl/blob/master/src/main/java/org/jruby/ext/openssl/X509Store.java#L159
+ if !!(RUBY_PLATFORM == "java")
+ Dir.glob(File.join(certificate_authority, "**", "*")).each { |f| store.add_file(f) }
+ else
+ store.add_path(certificate_authority)
+ end
+ end
+ end
+
+ store
+ end
+
+
+ private
def inc
@sequence = 0 if @sequence + 1 > Lumberjack::Beats::SEQUENCE_MAX
@sequence = @sequence + 1
end
@@ -132,10 +181,10 @@
send_payload(compress)
ack(elements.size)
end
- private
+ private
def compress_payload(payload)
compress = Zlib::Deflate.deflate(payload)
["1", "C", compress.bytesize, compress].pack("AANA*")
end