spec/codecs/netflow_spec.rb in logstash-codec-netflow-3.1.4 vs spec/codecs/netflow_spec.rb in logstash-codec-netflow-3.2.0
- old
+ new
@@ -964,7 +964,310 @@
expect(JSON.parse(decode[15].to_json)).to eq(JSON.parse(json_events[0]))
end
end
+ context "IPFIX Netscaler with variable length fields" do
+ let(:data) do
+ # this ipfix raw data was produced by a Netscaler appliance and captured with wireshark
+ # select packet bytes were then exported and sort of Pseudonymized to protect corp data
+ data = []
+ data << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_tpl.dat"), :mode => "rb")
+ data << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_data.dat"), :mode => "rb")
+ end
+ let(:json_events) do
+ events = []
+ events << <<-END
+ {
+ "@timestamp": "2016-11-11T12:09:19.000Z",
+ "netflow": {
+ "netscalerHttpReqUserAgent": "Mozilla/5.0 (Commodore 64; kobo.com) Gecko/20100101 Firefox/75.0",
+ "destinationTransportPort": 443,
+ "netscalerHttpReqCookie": "beer=123456789abcdefghijklmnopqrstuvw; AnotherCookie=1234567890abcdefghijklmnopqr; Shameless.Plug=Thankyou.Rakuten.Kobo.Inc.For.Allowing.me.time.to.work.on.this.and.contribute.back.to.the.community; Padding=aaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbccccccccccccccddddddddddddddddddddddeeeeeeeeeeeeeeeeeeeeeffffffffffffffffffffffgggggggggggggggggggggggghhhhhhhhhhhhhhhhhiiiiiiiiiiiiiiiiiiiiiijjjjjjjjjjjjjjjjjjjjjjjjkkkkkkkkkkkkkkkkkklllllllllllllllmmmmmmmmmm; more=less; GJquote=There.is.no.spoon; GarrySays=Nice!!; LastPadding=aaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbcccccccccccccccccccdddddddddddeeeeeeee",
+ "flowEndMicroseconds": "503894-10-15T08:48:16.972Z",
+ "netscalerHttpReqUrl": "/aa/bb/ccccc/ddddddddddddddddddddddddd",
+ "sourceIPv4Address": "192.168.0.1",
+ "netscalerHttpReqMethod": "GET",
+ "netscalerHttpReqHost": "www.kobo.com",
+ "egressInterface": 2147483651,
+ "octetDeltaCount": 1541,
+ "netscalerAppNameAppId": 240189440,
+ "sourceTransportPort": 51053,
+ "flowId": 14460661,
+ "netscalerHttpReqAuthorization": "",
+ "netscalerHttpDomainName": "www.kobo.com",
+ "netscalerAaaUsername": "",
+ "netscalerHttpContentType": "",
+ "destinationIPv4Address": "10.0.0.1",
+ "observationPointId": 167954698,
+ "netscalerHttpReqVia": "1.1 akamai.net(ghost) (AkamaiGHost)",
+ "netscalerConnectionId": 14460661,
+ "tcpControlBits": 24,
+ "flowStartMicroseconds": "503894-10-15T08:48:16.972Z",
+ "ingressInterface": 8,
+ "version": 10,
+ "packetDeltaCount": 2,
+ "netscalerUnknown330": 0,
+ "netscalerConnectionChainID": "00e0ed1c9ca80300efb42558596b0800",
+ "ipVersion": 4,
+ "protocolIdentifier": 6,
+ "netscalerHttpResForwLB": 0,
+ "netscalerHttpReqReferer": "http://www.kobo.com/is-the-best-ebook-company-in-the-world",
+ "exportingProcessId": 3,
+ "netscalerAppUnitNameAppId": 239927296,
+ "netscalerFlowFlags": 84025344,
+ "netscalerTransactionId": 1068114985,
+ "netscalerHttpResForwFB": 0,
+ "netscalerConnectionChainHopCount": 1,
+ "netscalerHttpReqXForwardedFor": "11.222.33.255"
+ },
+ "@version": "1"
+ }
+ END
+ end
+
+ it "should decode raw data" do
+ expect(decode.size).to eq(3)
+ expect(decode[0].get("[netflow][version]")).to eq(10)
+ expect(decode[0].get("[netflow][sourceIPv4Address]")).to eq('192.168.0.1')
+ expect(decode[0].get("[netflow][destinationIPv4Address]")).to eq('10.0.0.1')
+ expect(decode[0].get("[netflow][flowEndMicroseconds]")).to eq('503894-10-15T08:48:16.970Z')
+ expect(decode[0].get("[netflow][netscalerConnectionId]")).to eq(14460661)
+ expect(decode[1].get("[netflow][version]")).to eq(10)
+ expect(decode[1].get("[netflow][flowId]")).to eq(14460662)
+ expect(decode[1].get("[netflow][observationPointId]")).to eq(167954698)
+ expect(decode[1].get("[netflow][netscalerFlowFlags]")).to eq(1157636096)
+ expect(decode[1].get("[netflow][netscalerRoundTripTime]")).to eq(83)
+ expect(decode[2].get("[netflow][version]")).to eq(10)
+ expect(decode[2].get("[netflow][netscalerAppUnitNameAppId]")).to eq(239927296)
+ expect(decode[2].get("[netflow][netscalerHttpReqXForwardedFor]")).to eq('11.222.33.255')
+ end
+
+ it "should decode variable length fields" do
+ expect(decode[2].get("[netflow][netscalerHttpReqUrl]")).to eq('/aa/bb/ccccc/ddddddddddddddddddddddddd')
+ expect(decode[2].get("[netflow][netscalerHttpReqHost]")).to eq('www.kobo.com')
+ expect(decode[2].get("[netflow][netscalerHttpReqUserAgent]")).to eq('Mozilla/5.0 (Commodore 64; kobo.com) Gecko/20100101 Firefox/75.0')
+ expect(decode[2].get("[netflow][netscalerHttpReqVia]")).to eq('1.1 akamai.net(ghost) (AkamaiGHost)')
+ end
+
+ it "should decode fields with more than 255 chars" do
+ expect(decode[2].get("[netflow][netscalerHttpReqCookie]")).to eq('beer=123456789abcdefghijklmnopqrstuvw; AnotherCookie=1234567890abcdefghijklmnopqr; Shameless.Plug=Thankyou.Rakuten.Kobo.Inc.For.Allowing.me.time.to.work.on.this.and.contribute.back.to.the.community; Padding=aaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbccccccccccccccddddddddddddddddddddddeeeeeeeeeeeeeeeeeeeeeffffffffffffffffffffffgggggggggggggggggggggggghhhhhhhhhhhhhhhhhiiiiiiiiiiiiiiiiiiiiiijjjjjjjjjjjjjjjjjjjjjjjjkkkkkkkkkkkkkkkkkklllllllllllllllmmmmmmmmmm; more=less; GJquote=There.is.no.spoon; GarrySays=Nice!!; LastPadding=aaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbcccccccccccccccccccdddddddddddeeeeeeee')
+ end
+
+ it "should decode octetarray data" do
+ expect(decode[0].get("[netflow][netscalerConnectionChainID]")).to eq('00e0ed1c9ca80300efb4255884850600')
+ end
+
+ it "should serialize to json" do
+ expect(JSON.parse(decode[2].to_json)).to eq(JSON.parse(json_events[0]))
+ end
+ end
+end
+
+describe LogStash::Codecs::Netflow, 'missing templates, no template caching configured' do
+ subject do
+ LogStash::Codecs::Netflow.new.tap do |codec|
+ expect{codec.register}.not_to raise_error
+ end
+ end
+
+ let(:logger) { double("logger") }
+
+ before :each do
+ allow(LogStash::Codecs::Netflow).to receive(:logger).and_return(logger)
+ allow(logger).to receive(:debug) {}
+ allow(logger).to receive(:warn) {}
+ end
+
+ let(:decode) do
+ [].tap do |events|
+ data.each { |packet| subject.decode(packet){|event| events << event}}
+ end
+ end
+
+ context "IPFIX Netscaler with variable length fields, missing templates" do
+ let(:data) do
+ data = []
+ data << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_data.dat"), :mode => "rb")
+ end
+
+ it "can not / should not decode any data" do
+ expect(decode.size).to eq(0)
+ expect{decode[0].get("[netflow][version]")}.to raise_error(NoMethodError, /undefined method .get. for nil:NilClass/)
+ expect{JSON.parse(decode[0].to_json)}.to raise_error(JSON::ParserError)
+ end
+
+ it "should report missing templates" do
+ expect(logger).to receive(:warn).with(/No matching template for flow id/)
+ decode[0]
+ end
+ end
+end
+
+# New subject with config, ordered testing since we need caching before data processing
+describe LogStash::Codecs::Netflow, 'configured with template caching', :order => :defined do
+ context "IPFIX Netscaler with variable length fields" do
+ subject do
+ LogStash::Codecs::Netflow.new(cache_config).tap do |codec|
+ expect{codec.register}.not_to raise_error
+ end
+ end
+
+ let(:tmp_dir) { ENV["TMP"] || ENV["TMPDIR"] || ENV["TEMP"] || "/tmp" }
+
+ let(:cache_config) do
+ { "cache_save_path" => tmp_dir }
+ end
+
+ let(:data) do
+ # this ipfix raw data was produced by a Netscaler appliance and captured with wireshark
+ # select packet bytes were then exported and sort of Pseudonymized to protect corp data
+ data = []
+ data << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_data.dat"), :mode => "rb")
+ end
+
+ let(:templates) do
+ templates = []
+ templates << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_tpl.dat"), :mode => "rb")
+ end
+
+ let(:cache) do
+ [].tap do |events|
+ templates.each { |packet| subject.decode(packet){|event| events << event}}
+ end
+ end
+
+ let(:decode) do
+ [].tap do |events|
+ data.each { |packet| subject.decode(packet){|event| events << event}}
+ end
+ end
+
+ let(:cached_templates) do
+ cached_templates = <<-END
+ {
+ "0|256": [
+ ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"],
+ ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip4_addr","sourceIPv4Address"],["ip4_addr","destinationIPv4Address"],
+ ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"],
+ ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","ingressInterface"],["uint32","egressInterface"],
+ ["uint32","netscalerAppNameAppId"],["OctetArray","netscalerConnectionChainID",{"initial_length":16}],["uint8","netscalerConnectionChainHopCount"],["uint16","netscalerUnknown330"]],
+ "0|257": [
+ ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"],
+ ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip4_addr","sourceIPv4Address"],["ip4_addr","destinationIPv4Address"],
+ ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"],
+ ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","netscalerRoundTripTime"],["uint32","egressInterface"],
+ ["uint32","ingressInterface"],["uint32","netscalerAppNameAppId"],["OctetArray","netscalerConnectionChainID",{"initial_length":16}],["uint8","netscalerConnectionChainHopCount"],
+ ["uint16","netscalerUnknown329"],["uint16","netscalerUnknown331"],["uint32","netscalerUnknown332"]],
+ "0|258": [
+ ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"],
+ ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip4_addr","sourceIPv4Address"],["ip4_addr","destinationIPv4Address"],
+ ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"],
+ ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","ingressInterface"],["uint32","egressInterface"],
+ ["uint32","netscalerAppNameAppId"],["uint32","netscalerAppUnitNameAppId"],["uint64","netscalerHttpResForwFB"],["uint64","netscalerHttpResForwLB"],
+ ["OctetArray","netscalerConnectionChainID",{"initial_length":16}],["uint8","netscalerConnectionChainHopCount"],["uint16","netscalerUnknown330"],
+ ["VarString","netscalerAaaUsername"],["VarString","netscalerHttpReqUrl"],["VarString","netscalerHttpReqCookie"],["VarString","netscalerHttpReqReferer"],
+ ["VarString","netscalerHttpReqMethod"],["VarString","netscalerHttpReqHost"],["VarString","netscalerHttpReqUserAgent"],["VarString","netscalerHttpContentType"],
+ ["VarString","netscalerHttpReqAuthorization"],["VarString","netscalerHttpReqVia"],["VarString","netscalerHttpReqXForwardedFor"],["VarString","netscalerHttpDomainName"]],
+ "0|259": [
+ ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"],
+ ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip6_addr","sourceIPv6Address"],["ip6_addr","destinationIPv6Address"],
+ ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"],
+ ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","ingressInterface"],["uint32","egressInterface"],
+ ["uint32","netscalerAppNameAppId"],["OctetArray","netscalerConnectionChainID",{"initial_length":16}],["uint8","netscalerConnectionChainHopCount"],["uint16","netscalerUnknown330"]],
+ "0|260": [
+ ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"],
+ ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip6_addr","sourceIPv6Address"],["ip6_addr","destinationIPv6Address"],
+ ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"],
+ ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","netscalerRoundTripTime"],["uint32","egressInterface"],
+ ["uint32","ingressInterface"],["uint32","netscalerAppNameAppId"],["OctetArray","netscalerConnectionChainID",{"initial_length":16}],["uint8","netscalerConnectionChainHopCount"],
+ ["uint16","netscalerUnknown329"],["uint16","netscalerUnknown331"],["uint32","netscalerUnknown332"]],
+ "0|261": [
+ ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"],
+ ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip6_addr","sourceIPv6Address"],["ip6_addr","destinationIPv6Address"],
+ ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"],
+ ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","ingressInterface"],["uint32","egressInterface"],
+ ["uint32","netscalerAppNameAppId"],["uint32","netscalerAppUnitNameAppId"],["uint64","netscalerHttpResForwFB"],["uint64","netscalerHttpResForwLB"],
+ ["OctetArray","netscalerConnectionChainID",{"initial_length":16}],["uint8","netscalerConnectionChainHopCount"],["uint16","netscalerUnknown330"],
+ ["uint32","netscalerCacheRedirClientConnectionCoreID"],["uint32","netscalerCacheRedirClientConnectionTransactionID"],["VarString","netscalerAaaUsername"],
+ ["VarString","netscalerHttpReqUrl"],["VarString","netscalerHttpReqCookie"],["VarString","netscalerHttpReqReferer"],["VarString","netscalerHttpReqMethod"],
+ ["VarString","netscalerHttpReqHost"],["VarString","netscalerHttpReqUserAgent"],["VarString","netscalerHttpContentType"],["VarString","netscalerHttpReqAuthorization"],
+ ["VarString","netscalerHttpReqVia"],["VarString","netscalerHttpReqXForwardedFor"],["VarString","netscalerHttpDomainName"]],
+ "0|262": [
+ ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"],
+ ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip4_addr","sourceIPv4Address"],["ip4_addr","destinationIPv4Address"],
+ ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"],
+ ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","ingressInterface"],["uint32","egressInterface"],
+ ["uint16","netscalerHttpRspStatus"],["uint64","netscalerHttpRspLen"],["uint64","netscalerServerTTFB"],["uint64","netscalerServerTTLB"],
+ ["uint32","netscalerAppNameAppId"],["uint32","netscalerMainPageId"],["uint32","netscalerMainPageCoreId"],["uint64","netscalerHttpReqRcvFB"],
+ ["uint64","netscalerHttpReqForwFB"],["uint64","netscalerHttpResRcvFB"],["uint64","netscalerHttpReqRcvLB"],["uint64","netscalerHttpReqForwLB"],
+ ["uint64","netscalerHttpResRcvLB"],["uint32","netscalerClientRTT"],["uint16","netscalerUnknown330"],["uint32","netscalerUnknown347"],["VarString","netscalerAaaUsername"],
+ ["VarString","netscalerHttpContentType"],["VarString","netscalerHttpResLocation"],["VarString","netscalerHttpResSetCookie"],["VarString","netscalerHttpResSetCookie2"]]
+ }
+ END
+ end
+
+ it "should cache templates" do
+ expect(cache.size).to eq(0)
+ expect(JSON.parse(File.read("#{tmp_dir}/ipfix_templates.cache"))).to eq(JSON.parse(cached_templates))
+ end
+
+ it "should decode raw data based on cached templates" do
+ expect(decode.size).to eq(3)
+ expect(decode[0].get("[netflow][version]")).to eq(10)
+ expect(decode[0].get("[netflow][flowEndMicroseconds]")).to eq('503894-10-15T08:48:16.970Z')
+ expect(decode[0].get("[netflow][netscalerConnectionId]")).to eq(14460661)
+ expect(decode[1].get("[netflow][version]")).to eq(10)
+ expect(decode[1].get("[netflow][observationPointId]")).to eq(167954698)
+ expect(decode[1].get("[netflow][netscalerFlowFlags]")).to eq(1157636096)
+ expect(decode[2].get("[netflow][version]")).to eq(10)
+ expect(decode[2].get("[netflow][netscalerAppUnitNameAppId]")).to eq(239927296)
+ expect(decode[2].get("[netflow][netscalerHttpReqXForwardedFor]")).to eq('11.222.33.255')
+ FileUtils.rm_rf(tmp_dir)
+ end
+ end
+end
+
+describe LogStash::Codecs::Netflow, 'configured with include_flowset_id for ipfix' do
+ subject do
+ LogStash::Codecs::Netflow.new(include_flowset_id_config).tap do |codec|
+ expect{codec.register}.not_to raise_error
+ end
+ end
+
+ let(:include_flowset_id_config) do
+ { "include_flowset_id" => true }
+ end
+
+ let(:decode) do
+ [].tap do |events|
+ data.each { |packet| subject.decode(packet){|event| events << event}}
+ end
+ end
+
+ let(:data) do
+ # this ipfix raw data was produced by a Netscaler appliance and captured with wireshark
+ # select packet bytes were then exported and sort of Pseudonymized to protect corp data
+ data = []
+ data << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_tpl.dat"), :mode => "rb")
+ data << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_data.dat"), :mode => "rb")
+ end
+
+ it "should decode raw data" do
+ expect(decode.size).to eq(3)
+ expect(decode[0].get("[netflow][version]")).to eq(10)
+ expect(decode[0].get("[netflow][flowEndMicroseconds]")).to eq('503894-10-15T08:48:16.970Z')
+ expect(decode[0].get("[netflow][netscalerConnectionId]")).to eq(14460661)
+ expect(decode[1].get("[netflow][version]")).to eq(10)
+ expect(decode[1].get("[netflow][observationPointId]")).to eq(167954698)
+ expect(decode[1].get("[netflow][netscalerFlowFlags]")).to eq(1157636096)
+ expect(decode[2].get("[netflow][version]")).to eq(10)
+ expect(decode[2].get("[netflow][netscalerAppUnitNameAppId]")).to eq(239927296)
+ end
+
+ it "should include flowset_id" do
+ expect(decode[0].get("[netflow][flowset_id]")).to eq(258)
+ expect(decode[1].get("[netflow][flowset_id]")).to eq(257)
+ expect(decode[2].get("[netflow][flowset_id]")).to eq(258)
+ end
end