spec/codecs/netflow_spec.rb in logstash-codec-netflow-3.1.4 vs spec/codecs/netflow_spec.rb in logstash-codec-netflow-3.2.0

- old
+ new

@@ -964,7 +964,310 @@ expect(JSON.parse(decode[15].to_json)).to eq(JSON.parse(json_events[0])) end end + context "IPFIX Netscaler with variable length fields" do + let(:data) do + # this ipfix raw data was produced by a Netscaler appliance and captured with wireshark + # select packet bytes were then exported and sort of Pseudonymized to protect corp data + data = [] + data << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_tpl.dat"), :mode => "rb") + data << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_data.dat"), :mode => "rb") + end + let(:json_events) do + events = [] + events << <<-END + { + "@timestamp": "2016-11-11T12:09:19.000Z", + "netflow": { + "netscalerHttpReqUserAgent": "Mozilla/5.0 (Commodore 64; kobo.com) Gecko/20100101 Firefox/75.0", + "destinationTransportPort": 443, + "netscalerHttpReqCookie": "beer=123456789abcdefghijklmnopqrstuvw; AnotherCookie=1234567890abcdefghijklmnopqr; Shameless.Plug=Thankyou.Rakuten.Kobo.Inc.For.Allowing.me.time.to.work.on.this.and.contribute.back.to.the.community; Padding=aaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbccccccccccccccddddddddddddddddddddddeeeeeeeeeeeeeeeeeeeeeffffffffffffffffffffffgggggggggggggggggggggggghhhhhhhhhhhhhhhhhiiiiiiiiiiiiiiiiiiiiiijjjjjjjjjjjjjjjjjjjjjjjjkkkkkkkkkkkkkkkkkklllllllllllllllmmmmmmmmmm; more=less; GJquote=There.is.no.spoon; GarrySays=Nice!!; LastPadding=aaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbcccccccccccccccccccdddddddddddeeeeeeee", + "flowEndMicroseconds": "503894-10-15T08:48:16.972Z", + "netscalerHttpReqUrl": "/aa/bb/ccccc/ddddddddddddddddddddddddd", + "sourceIPv4Address": "192.168.0.1", + "netscalerHttpReqMethod": "GET", + "netscalerHttpReqHost": "www.kobo.com", + "egressInterface": 2147483651, + "octetDeltaCount": 1541, + "netscalerAppNameAppId": 240189440, + "sourceTransportPort": 51053, + "flowId": 14460661, + "netscalerHttpReqAuthorization": "", + "netscalerHttpDomainName": "www.kobo.com", + "netscalerAaaUsername": "", + "netscalerHttpContentType": "", + "destinationIPv4Address": "10.0.0.1", + "observationPointId": 167954698, + "netscalerHttpReqVia": "1.1 akamai.net(ghost) (AkamaiGHost)", + "netscalerConnectionId": 14460661, + "tcpControlBits": 24, + "flowStartMicroseconds": "503894-10-15T08:48:16.972Z", + "ingressInterface": 8, + "version": 10, + "packetDeltaCount": 2, + "netscalerUnknown330": 0, + "netscalerConnectionChainID": "00e0ed1c9ca80300efb42558596b0800", + "ipVersion": 4, + "protocolIdentifier": 6, + "netscalerHttpResForwLB": 0, + "netscalerHttpReqReferer": "http://www.kobo.com/is-the-best-ebook-company-in-the-world", + "exportingProcessId": 3, + "netscalerAppUnitNameAppId": 239927296, + "netscalerFlowFlags": 84025344, + "netscalerTransactionId": 1068114985, + "netscalerHttpResForwFB": 0, + "netscalerConnectionChainHopCount": 1, + "netscalerHttpReqXForwardedFor": "11.222.33.255" + }, + "@version": "1" + } + END + end + + it "should decode raw data" do + expect(decode.size).to eq(3) + expect(decode[0].get("[netflow][version]")).to eq(10) + expect(decode[0].get("[netflow][sourceIPv4Address]")).to eq('192.168.0.1') + expect(decode[0].get("[netflow][destinationIPv4Address]")).to eq('10.0.0.1') + expect(decode[0].get("[netflow][flowEndMicroseconds]")).to eq('503894-10-15T08:48:16.970Z') + expect(decode[0].get("[netflow][netscalerConnectionId]")).to eq(14460661) + expect(decode[1].get("[netflow][version]")).to eq(10) + expect(decode[1].get("[netflow][flowId]")).to eq(14460662) + expect(decode[1].get("[netflow][observationPointId]")).to eq(167954698) + expect(decode[1].get("[netflow][netscalerFlowFlags]")).to eq(1157636096) + expect(decode[1].get("[netflow][netscalerRoundTripTime]")).to eq(83) + expect(decode[2].get("[netflow][version]")).to eq(10) + expect(decode[2].get("[netflow][netscalerAppUnitNameAppId]")).to eq(239927296) + expect(decode[2].get("[netflow][netscalerHttpReqXForwardedFor]")).to eq('11.222.33.255') + end + + it "should decode variable length fields" do + expect(decode[2].get("[netflow][netscalerHttpReqUrl]")).to eq('/aa/bb/ccccc/ddddddddddddddddddddddddd') + expect(decode[2].get("[netflow][netscalerHttpReqHost]")).to eq('www.kobo.com') + expect(decode[2].get("[netflow][netscalerHttpReqUserAgent]")).to eq('Mozilla/5.0 (Commodore 64; kobo.com) Gecko/20100101 Firefox/75.0') + expect(decode[2].get("[netflow][netscalerHttpReqVia]")).to eq('1.1 akamai.net(ghost) (AkamaiGHost)') + end + + it "should decode fields with more than 255 chars" do + expect(decode[2].get("[netflow][netscalerHttpReqCookie]")).to eq('beer=123456789abcdefghijklmnopqrstuvw; AnotherCookie=1234567890abcdefghijklmnopqr; Shameless.Plug=Thankyou.Rakuten.Kobo.Inc.For.Allowing.me.time.to.work.on.this.and.contribute.back.to.the.community; Padding=aaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbccccccccccccccddddddddddddddddddddddeeeeeeeeeeeeeeeeeeeeeffffffffffffffffffffffgggggggggggggggggggggggghhhhhhhhhhhhhhhhhiiiiiiiiiiiiiiiiiiiiiijjjjjjjjjjjjjjjjjjjjjjjjkkkkkkkkkkkkkkkkkklllllllllllllllmmmmmmmmmm; more=less; GJquote=There.is.no.spoon; GarrySays=Nice!!; LastPadding=aaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbcccccccccccccccccccdddddddddddeeeeeeee') + end + + it "should decode octetarray data" do + expect(decode[0].get("[netflow][netscalerConnectionChainID]")).to eq('00e0ed1c9ca80300efb4255884850600') + end + + it "should serialize to json" do + expect(JSON.parse(decode[2].to_json)).to eq(JSON.parse(json_events[0])) + end + end +end + +describe LogStash::Codecs::Netflow, 'missing templates, no template caching configured' do + subject do + LogStash::Codecs::Netflow.new.tap do |codec| + expect{codec.register}.not_to raise_error + end + end + + let(:logger) { double("logger") } + + before :each do + allow(LogStash::Codecs::Netflow).to receive(:logger).and_return(logger) + allow(logger).to receive(:debug) {} + allow(logger).to receive(:warn) {} + end + + let(:decode) do + [].tap do |events| + data.each { |packet| subject.decode(packet){|event| events << event}} + end + end + + context "IPFIX Netscaler with variable length fields, missing templates" do + let(:data) do + data = [] + data << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_data.dat"), :mode => "rb") + end + + it "can not / should not decode any data" do + expect(decode.size).to eq(0) + expect{decode[0].get("[netflow][version]")}.to raise_error(NoMethodError, /undefined method .get. for nil:NilClass/) + expect{JSON.parse(decode[0].to_json)}.to raise_error(JSON::ParserError) + end + + it "should report missing templates" do + expect(logger).to receive(:warn).with(/No matching template for flow id/) + decode[0] + end + end +end + +# New subject with config, ordered testing since we need caching before data processing +describe LogStash::Codecs::Netflow, 'configured with template caching', :order => :defined do + context "IPFIX Netscaler with variable length fields" do + subject do + LogStash::Codecs::Netflow.new(cache_config).tap do |codec| + expect{codec.register}.not_to raise_error + end + end + + let(:tmp_dir) { ENV["TMP"] || ENV["TMPDIR"] || ENV["TEMP"] || "/tmp" } + + let(:cache_config) do + { "cache_save_path" => tmp_dir } + end + + let(:data) do + # this ipfix raw data was produced by a Netscaler appliance and captured with wireshark + # select packet bytes were then exported and sort of Pseudonymized to protect corp data + data = [] + data << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_data.dat"), :mode => "rb") + end + + let(:templates) do + templates = [] + templates << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_tpl.dat"), :mode => "rb") + end + + let(:cache) do + [].tap do |events| + templates.each { |packet| subject.decode(packet){|event| events << event}} + end + end + + let(:decode) do + [].tap do |events| + data.each { |packet| subject.decode(packet){|event| events << event}} + end + end + + let(:cached_templates) do + cached_templates = <<-END + { + "0|256": [ + ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"], + ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip4_addr","sourceIPv4Address"],["ip4_addr","destinationIPv4Address"], + ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"], + ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","ingressInterface"],["uint32","egressInterface"], + ["uint32","netscalerAppNameAppId"],["OctetArray","netscalerConnectionChainID",{"initial_length":16}],["uint8","netscalerConnectionChainHopCount"],["uint16","netscalerUnknown330"]], + "0|257": [ + ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"], + ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip4_addr","sourceIPv4Address"],["ip4_addr","destinationIPv4Address"], + ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"], + ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","netscalerRoundTripTime"],["uint32","egressInterface"], + ["uint32","ingressInterface"],["uint32","netscalerAppNameAppId"],["OctetArray","netscalerConnectionChainID",{"initial_length":16}],["uint8","netscalerConnectionChainHopCount"], + ["uint16","netscalerUnknown329"],["uint16","netscalerUnknown331"],["uint32","netscalerUnknown332"]], + "0|258": [ + ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"], + ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip4_addr","sourceIPv4Address"],["ip4_addr","destinationIPv4Address"], + ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"], + ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","ingressInterface"],["uint32","egressInterface"], + ["uint32","netscalerAppNameAppId"],["uint32","netscalerAppUnitNameAppId"],["uint64","netscalerHttpResForwFB"],["uint64","netscalerHttpResForwLB"], + ["OctetArray","netscalerConnectionChainID",{"initial_length":16}],["uint8","netscalerConnectionChainHopCount"],["uint16","netscalerUnknown330"], + ["VarString","netscalerAaaUsername"],["VarString","netscalerHttpReqUrl"],["VarString","netscalerHttpReqCookie"],["VarString","netscalerHttpReqReferer"], + ["VarString","netscalerHttpReqMethod"],["VarString","netscalerHttpReqHost"],["VarString","netscalerHttpReqUserAgent"],["VarString","netscalerHttpContentType"], + ["VarString","netscalerHttpReqAuthorization"],["VarString","netscalerHttpReqVia"],["VarString","netscalerHttpReqXForwardedFor"],["VarString","netscalerHttpDomainName"]], + "0|259": [ + ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"], + ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip6_addr","sourceIPv6Address"],["ip6_addr","destinationIPv6Address"], + ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"], + ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","ingressInterface"],["uint32","egressInterface"], + ["uint32","netscalerAppNameAppId"],["OctetArray","netscalerConnectionChainID",{"initial_length":16}],["uint8","netscalerConnectionChainHopCount"],["uint16","netscalerUnknown330"]], + "0|260": [ + ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"], + ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip6_addr","sourceIPv6Address"],["ip6_addr","destinationIPv6Address"], + ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"], + ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","netscalerRoundTripTime"],["uint32","egressInterface"], + ["uint32","ingressInterface"],["uint32","netscalerAppNameAppId"],["OctetArray","netscalerConnectionChainID",{"initial_length":16}],["uint8","netscalerConnectionChainHopCount"], + ["uint16","netscalerUnknown329"],["uint16","netscalerUnknown331"],["uint32","netscalerUnknown332"]], + "0|261": [ + ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"], + ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip6_addr","sourceIPv6Address"],["ip6_addr","destinationIPv6Address"], + ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"], + ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","ingressInterface"],["uint32","egressInterface"], + ["uint32","netscalerAppNameAppId"],["uint32","netscalerAppUnitNameAppId"],["uint64","netscalerHttpResForwFB"],["uint64","netscalerHttpResForwLB"], + ["OctetArray","netscalerConnectionChainID",{"initial_length":16}],["uint8","netscalerConnectionChainHopCount"],["uint16","netscalerUnknown330"], + ["uint32","netscalerCacheRedirClientConnectionCoreID"],["uint32","netscalerCacheRedirClientConnectionTransactionID"],["VarString","netscalerAaaUsername"], + ["VarString","netscalerHttpReqUrl"],["VarString","netscalerHttpReqCookie"],["VarString","netscalerHttpReqReferer"],["VarString","netscalerHttpReqMethod"], + ["VarString","netscalerHttpReqHost"],["VarString","netscalerHttpReqUserAgent"],["VarString","netscalerHttpContentType"],["VarString","netscalerHttpReqAuthorization"], + ["VarString","netscalerHttpReqVia"],["VarString","netscalerHttpReqXForwardedFor"],["VarString","netscalerHttpDomainName"]], + "0|262": [ + ["uint32","observationPointId"],["uint32","exportingProcessId"],["uint64","flowId"],["uint32","netscalerTransactionId"],["uint32","netscalerConnectionId"], + ["uint8","ipVersion"],["uint8","protocolIdentifier"],["skip",null,{"length":2}],["ip4_addr","sourceIPv4Address"],["ip4_addr","destinationIPv4Address"], + ["uint16","sourceTransportPort"],["uint16","destinationTransportPort"],["uint64","packetDeltaCount"],["uint64","octetDeltaCount"],["uint8","tcpControlBits"], + ["uint64","netscalerFlowFlags"],["uint64","flowStartMicroseconds"],["uint64","flowEndMicroseconds"],["uint32","ingressInterface"],["uint32","egressInterface"], + ["uint16","netscalerHttpRspStatus"],["uint64","netscalerHttpRspLen"],["uint64","netscalerServerTTFB"],["uint64","netscalerServerTTLB"], + ["uint32","netscalerAppNameAppId"],["uint32","netscalerMainPageId"],["uint32","netscalerMainPageCoreId"],["uint64","netscalerHttpReqRcvFB"], + ["uint64","netscalerHttpReqForwFB"],["uint64","netscalerHttpResRcvFB"],["uint64","netscalerHttpReqRcvLB"],["uint64","netscalerHttpReqForwLB"], + ["uint64","netscalerHttpResRcvLB"],["uint32","netscalerClientRTT"],["uint16","netscalerUnknown330"],["uint32","netscalerUnknown347"],["VarString","netscalerAaaUsername"], + ["VarString","netscalerHttpContentType"],["VarString","netscalerHttpResLocation"],["VarString","netscalerHttpResSetCookie"],["VarString","netscalerHttpResSetCookie2"]] + } + END + end + + it "should cache templates" do + expect(cache.size).to eq(0) + expect(JSON.parse(File.read("#{tmp_dir}/ipfix_templates.cache"))).to eq(JSON.parse(cached_templates)) + end + + it "should decode raw data based on cached templates" do + expect(decode.size).to eq(3) + expect(decode[0].get("[netflow][version]")).to eq(10) + expect(decode[0].get("[netflow][flowEndMicroseconds]")).to eq('503894-10-15T08:48:16.970Z') + expect(decode[0].get("[netflow][netscalerConnectionId]")).to eq(14460661) + expect(decode[1].get("[netflow][version]")).to eq(10) + expect(decode[1].get("[netflow][observationPointId]")).to eq(167954698) + expect(decode[1].get("[netflow][netscalerFlowFlags]")).to eq(1157636096) + expect(decode[2].get("[netflow][version]")).to eq(10) + expect(decode[2].get("[netflow][netscalerAppUnitNameAppId]")).to eq(239927296) + expect(decode[2].get("[netflow][netscalerHttpReqXForwardedFor]")).to eq('11.222.33.255') + FileUtils.rm_rf(tmp_dir) + end + end +end + +describe LogStash::Codecs::Netflow, 'configured with include_flowset_id for ipfix' do + subject do + LogStash::Codecs::Netflow.new(include_flowset_id_config).tap do |codec| + expect{codec.register}.not_to raise_error + end + end + + let(:include_flowset_id_config) do + { "include_flowset_id" => true } + end + + let(:decode) do + [].tap do |events| + data.each { |packet| subject.decode(packet){|event| events << event}} + end + end + + let(:data) do + # this ipfix raw data was produced by a Netscaler appliance and captured with wireshark + # select packet bytes were then exported and sort of Pseudonymized to protect corp data + data = [] + data << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_tpl.dat"), :mode => "rb") + data << IO.read(File.join(File.dirname(__FILE__), "ipfix_test_netscaler_data.dat"), :mode => "rb") + end + + it "should decode raw data" do + expect(decode.size).to eq(3) + expect(decode[0].get("[netflow][version]")).to eq(10) + expect(decode[0].get("[netflow][flowEndMicroseconds]")).to eq('503894-10-15T08:48:16.970Z') + expect(decode[0].get("[netflow][netscalerConnectionId]")).to eq(14460661) + expect(decode[1].get("[netflow][version]")).to eq(10) + expect(decode[1].get("[netflow][observationPointId]")).to eq(167954698) + expect(decode[1].get("[netflow][netscalerFlowFlags]")).to eq(1157636096) + expect(decode[2].get("[netflow][version]")).to eq(10) + expect(decode[2].get("[netflow][netscalerAppUnitNameAppId]")).to eq(239927296) + end + + it "should include flowset_id" do + expect(decode[0].get("[netflow][flowset_id]")).to eq(258) + expect(decode[1].get("[netflow][flowset_id]")).to eq(257) + expect(decode[2].get("[netflow][flowset_id]")).to eq(258) + end end