# coding: utf-8 # This file is part of PacketGen # See https://github.com/sdaubert/packetgen for more informations # Copyright (C) 2016 Sylvain Daubert # This program is published under MIT license. # frozen_string_literal: true require 'pcaprub' module PacketGen # An object of type {Packet} handles a network packet. This packet may contain # multiple protocol headers, starting from MAC layer or from Network (OSI) layer. # # Creating a packet is fairly simple: # Packet.gen 'IP', src: '192.168.1.1', dst: '192.168.1.2' # # == Create a packet # Packets may be hand-made or parsed from a binary string: # Packet.gen('IP', src: '192.168.1.1', dst: '192.168.1.2').add('UDP', sport: 45000, dport: 23) # Packet.parse(binary_string) # # == Access packet information # pkt = Packet.gen('IP').add('UDP') # # read information # pkt.udp.sport # pkt.ip.ttl # # set information # pkt.udp.dport = 2323 # pkt.ip.ttl = 1 # pkt.ip(ttl: 1, id: 1234) # # == Save a packet to a file # pkt.write('file.pcapng') # # == Get packets # Packets may be captured from wire: # Packet.capture do |packet| # do_some_stuffs # end # packets = Packet.capture(iface: 'eth0', max: 5) # get 5 packets from eth0 # # Packets may also be read from a file: # packets = Packet.read(file.pcapng) # # == Save packets to a file # Packet.write 'file.pcapng', packets # # @since 2.0.0 # # Packet accessor has changed. When header class is in a namespace # (for example Dot11::* header classes), to avoid clashes in names, such # accessors are named +namespace_class+. For example {Header::Dot11::Data} # header is now accessed through +Packet#dot11_data+ and nor more +Packet#data+). # @author Sylvain Daubert class Packet # @return [Array] captured packet def self.capture(options={}) capture = Capture.new(options) if block_given? capture.start { |packet| yield packet } else capture.start end capture.packets end # Read packets from +filename+. Mays read Pcap and Pcap-NG formats. # # For more control, see {PcapNG::File} or +PCAPRUB::Pcap+. # @param [String] filename PcapNG or Pcap file. # @return [Array] # @author Sylvain Daubert # @author Kent Gruber def self.read(filename) PcapNG::File.new.read_packets filename rescue StandardError => e raise ArgumentError, e unless File.extname(filename.downcase) == '.pcap' packets = [] PCAPRUB::Pcap.open_offline(filename).each_packet do |packet| next unless (packet = PacketGen.parse(packet.to_s)) packets << packet end packets end # Write packets to +filename+ # # For more options, see {PcapNG::File}. # @param [String] filename # @param [Array] packets packets to write # @return [void] def self.write(filename, packets) pf = PcapNG::File.new pf.array_to_file packets pf.to_f filename end # @private def initialize @headers = [] end # Add a protocol on packet stack # @param [String] protocol # @param [Hash] options protocol specific options # @return [self] # @raise [ArgumentError] unknown protocol def add(protocol, options={}) klass = check_protocol(protocol) header = klass.new(options) add_header header self end # Insert a header in packet # @param [Header] prev header after which insert new one # @param [String] protocol protocol to insert # @param [Hash] options protocol specific options # @return [self] # @raise [ArgumentError] unknown protocol def insert(prev, protocol, options={}) klass = check_protocol(protocol) nxt = prev.body header = klass.new(options) add_header header, previous_header: prev idx = @headers.index(prev) + 1 @headers[idx, 0] = header header[:body] = nxt self end # Check if a protocol header is embedded in packet # @return [Boolean] # @raise [ArgumentError] unknown protocol def is?(protocol) klass = check_protocol protocol @headers.any? { |h| h.is_a? klass } end # Recalculate all packet checksums # @return [void] def calc_checksum @headers.reverse_each do |header| header.calc_checksum if header.respond_to? :calc_checksum end end # Recalculate all packet length fields # @return [void] def calc_length @headers.each do |header| header.calc_length if header.respond_to? :calc_length end end # Recalculate all calculatable fields (for now: length and checksum) # @return [void] def calc calc_length calc_checksum end # Get packet body # @return [Types] def body @headers.last.body if @headers.last.respond_to? :body end # Set packet body # @param [String] str # @return [void] def body=(str) @headers.last.body = str end # Get binary string # @return [String] def to_s @headers.first.to_s end # Write a PCapNG file to disk. # @param [String] filename # @return [Array] see return from {PcapNG::File#to_file} # @see File def to_f(filename) PcapNG::File.new.array_to_file(filename: filename, array: [self]) end alias write to_f # send packet on wire. Use first header +#to_w+ method. # @param [String] iface interface name. Default to first non-loopback interface # @param [Boolean] calc call {#calc} on packet before sending it # @param [Integer] number number of times to send the packets # @param [Integer,Float] interval time, in seconds, between sending 2 packets # @return [void] # @since 2.1.4 add `calc`, `number` and `interval` parameters def to_w(iface=nil, calc: false, number: 1, interval: 1) iface ||= PacketGen.default_iface if @headers.first.respond_to? :to_w self.calc if calc if number == 1 @headers.first.to_w(iface) else number.times do @headers.first.to_w(iface) sleep interval end end else type = @headers.first.protocol_name raise WireError, "don't known how to send a #{type} packet on wire" end end # Encapulate another packet in +self+ # @param [Packet] other # @param [Boolean] parsing set to +true+ to not update last current header field # from binding with first other's one. Use only when current header field # has its value set accordingly. # @return [self] +self+ with new headers from +other+ # @since 1.1.0 def encapsulate(other, parsing: false) other.headers.each_with_index do |h, i| add_header h, parsing: (i > 0) || parsing end end # Remove headers from +self+ # @param [Array
] headers # @return [self] +self+ with some headers removed # @raise [FormatError] any headers not in +self+ # @raise [FormatError] removed headers result in an unknown binding # @since 1.1.0 def decapsulate(*headers) headers.each do |header| idx = @headers.index(header) raise FormatError, 'header not in packet!' if idx.nil? prev_header = idx > 0 ? @headers[idx - 1] : nil next_header = (idx + 1) < @headers.size ? @headers[idx + 1] : nil @headers.delete_at(idx) if prev_header && next_header add_header(next_header, previous_header: prev_header) end end rescue ArgumentError => ex raise FormatError, ex.message end # Parse a binary string and populate Packet from it. # @param [String] binary_str # @param [String,nil] first_header First protocol header. +nil+ means discover it! # @return [Packet] self # @raise [ArgumentError] +first_header+ is an unknown header def parse(binary_str, first_header: nil) @headers.clear if first_header.nil? # No decoding forced for first header. Have to guess it! first_header = guess_first_header(binary_str) if first_header.nil? raise ParseError, 'cannot identify first header in string' end end add first_header @headers[-1, 1] = @headers.last.read(binary_str) # Decode upper headers recursively decode_bottom_up self end # @return [String] def inspect str = Inspect.dashed_line(self.class) @headers.each do |header| str << header.inspect end str << Inspect.inspect_body(body) end # @param [Packet] other # @return [Boolean] def ==(other) to_s == other.to_s end # Invert all possible fields in packet to create a reply. # @return [self] def reply! @headers.each do |header| header.reply! if header.respond_to?(:reply!) end self end # Forge a new packet from current one with all possible fields # inverted. The new packet may be a reply to current one. # @return [Packet] def reply pkt = dup pkt.reply! end private # Dup +@headers+ instance variable. Internally used by +#dup+ and +#clone+ # @return [void] def initialize_copy(_other) @headers = @headers.map(&:dup) @headers.each do |header| add_magic_header_method header end end # @overload header(klass, layer=1) # @param [Class] klass # @param [Integer] layer # @overload header(klass, options={}) # @param [String] klass # @param [Hash] options # @raise [ArgumentError] unknown option # @return [Header::Base] def header(klass, arg) headers = @headers.select { |h| h.is_a? klass } layer = arg.is_a?(Integer) ? arg : 1 header = headers[layer - 1] if arg.is_a? Hash arg.each do |key, value| unless header.respond_to? "#{key}=" raise ArgumentError, "unknown #{key} attribute for #{klass}" end header.send "#{key}=", value end end header end # check if protocol is known # @param [String] protocol # @raise [ArgumentError] unknown protocol def check_protocol(protocol) klass = Header.get_header_class_by_name(protocol) raise ArgumentError, "unknown #{protocol} protocol" if klass.nil? klass end # Add a header to packet # @param [Header::Base] header # @param [Header::Base] previous_header # @param [Boolean] parsing # @return [void] def add_header(header, previous_header: nil, parsing: false) prev_header = previous_header || @headers.last if prev_header bindings = prev_header.class.known_headers[header.class] if bindings.nil? bindings = prev_header.class.known_headers[header.class.superclass] if bindings.nil? msg = "#{prev_header.class} knowns no layer association with #{header.protocol_name}. ".dup msg << "Try #{prev_header.class}.bind_layer(#{header.class}, " msg << "#{prev_header.method_name}_proto_field: " msg << "value_for_#{header.method_name})" raise ArgumentError, msg end end bindings.set(prev_header) if !bindings.empty? && !parsing prev_header[:body] = header end header.packet = self @headers << header unless previous_header return if respond_to? header.method_name add_magic_header_method header end def add_magic_header_method(header) self.instance_eval "def #{header.method_name}(arg=nil);" \ "header(#{header.class}, arg); end" end def guess_first_header(binary_str) first_header = nil Header.all.each do |hklass| hdr = hklass.new # #read may return another object (more specific class) hdr = hdr.read(binary_str) # First header is found when: # * for one known header, # * +#parse?+ is true # * it exists a known binding with a upper header next unless hdr.parse? search_header(hdr) do first_header = hklass.to_s.gsub(/.*::/, '') end break unless first_header.nil? end first_header end def decode_bottom_up decode_packet_bottom_up = true while decode_packet_bottom_up last_known_hdr = @headers.last break unless last_known_hdr.respond_to? :body break if last_known_hdr.body.empty? search_header(last_known_hdr) do |nh| str = last_known_hdr.body nheader = nh.new nheader = nheader.read(str) next unless nheader.parse? add_header nheader, parsing: true end decode_packet_bottom_up = (@headers.last != last_known_hdr) end end def search_header(hdr) hdr.class.known_headers.each do |nh, bindings| if bindings.check?(hdr) yield nh break end end end end end require_relative 'header'