Skip to content

Commit

Permalink
Added tcpdump reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
ArneTR committed Nov 20, 2024
1 parent f9df24f commit 573b1d6
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 6 deletions.
3 changes: 2 additions & 1 deletion config.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ measurement:
# resolution: 99
# memory.used.procfs.system.provider.MemoryUsedProcfsSystemProvider:
# resolution: 99

# network.connections.tcpdump.system.provider.NetworkConnectionsTcpdumpSystemProvider:
# split_ports: True
#--- Architecture - MacOS
macos:
#--- MacOS: On Mac you only need this provider. Please remove all others!
Expand Down
3 changes: 3 additions & 0 deletions metric_providers/network/connections/tcpdump/system/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Information

See https://docs.green-coding.io/docs/measuring/metric-providers/network-connections-tcpdump-system/ for details.
172 changes: 172 additions & 0 deletions metric_providers/network/connections/tcpdump/system/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import os
import re
from collections import defaultdict
import ipaddress
#import netifaces

from metric_providers.base import BaseMetricProvider
from lib.db import DB

class NetworkConnectionsTcpdumpSystemProvider(BaseMetricProvider):
def __init__(self, *, split_ports=True, skip_check=False):
super().__init__(
metric_name='network_connections_tcpdump_system_provider',
metrics={},
resolution=None,
unit=None,
current_dir=os.path.dirname(os.path.abspath(__file__)),
metric_provider_executable='tcpdump.sh',
skip_check=skip_check
)
self.split_ports = split_ports


def read_metrics(self, run_id, containers=None):
with open(self._filename, 'r', encoding='utf-8') as file:
lines = file.readlines()

stats = parse_tcpdump(lines, split_ports=self.split_ports)

if rows := len(stats):
DB().query("""
UPDATE runs
SET logs= COALESCE(logs, '') || %s -- append
WHERE id = %s
""", params=(generate_stats_string(stats), run_id))
return rows

return 0

def get_stderr(self):
stderr = super().get_stderr()

if not stderr:
return stderr

# truncate the first bogus line with information similar to:
# tcpdump: listening on eno2, link-type EN10MB (Ethernet), snapshot length 262144 bytes
return stderr[stderr.find("\n")+1:]

def get_primary_interface():
gateways = netifaces.gateways()
if 'default' in gateways and netifaces.AF_INET in gateways['default']:
return gateways['default'][netifaces.AF_INET][1]

raise RuntimeError('Could not get primary network interface')

def get_ip_addresses(interface):
addresses = []

try:
addrs = netifaces.ifaddresses(interface)

if netifaces.AF_INET in addrs:
addresses.append(addrs[netifaces.AF_INET][0]['addr'])

if netifaces.AF_INET6 in addrs:
# Get the first non-link-local IPv6 address
for addr in addrs[netifaces.AF_INET6]:
if not addr['addr'].startswith('fe80:') and not addr['addr'].startswith('fd00:'):
addresses.append(addr['addr'])
break
except RuntimeError as e:
print(f"Error getting IP addresses: {e}")

if not addresses:
raise RuntimeError('Could not determine either IPv4 or IPv6 address')

return addresses

def parse_tcpdump(lines, split_ports=False):
stats = defaultdict(lambda: {'ports': defaultdict(lambda: {'packets': 0, 'bytes': 0}), 'total_bytes': 0})
ip_pattern = r'(\S+) > (\S+):'
#tcp_pattern = r'Flags \[(.+?)\]'

for line in lines:
ip_match = re.search(ip_pattern, line)
#tcp_match = re.search(tcp_pattern, line)

if ip_match:
src, dst = ip_match.groups()
src_ip, src_port = parse_ip_port(src)
dst_ip, dst_port = parse_ip_port(dst)

if src_ip and dst_ip:
protocol = "UDP" if "UDP" in line else "TCP"

if protocol == "UDP":
# For UDP, use the reported length
length_pattern = r'length:? (\d+)'
length_match = re.search(length_pattern, line)
if not length_match or not length_match.group(1):
raise RuntimeError(f"Could not find UDP packet length for line: {line}")
packet_length = int(length_match.group(1))

else:
# For TCP, estimate packet length (this is a simplification)
length_pattern = r'length (\d+)'
length_match = re.search(length_pattern, line)

if not length_match or not length_match.group(1):
if '.53 ' in line or '.53:' in line: # try DNS match
dns_packet_length = re.match(r'.*\((\d+)\)$', line)
if not dns_packet_length:
raise RuntimeError(f"Could not find TCP packet length for line: {line}")
packet_length = int(dns_packet_length[1])
raise RuntimeError("No packet length was detected for line {line}")
else:
packet_length = 40 + int(length_match.group(1)) # Assuming 40 bytes for IP + TCP headers

# Update source IP stats
if split_ports:
stats[src_ip]['ports'][f"{src_port}/{protocol}"]['packets'] += 1
stats[src_ip]['ports'][f"{src_port}/{protocol}"]['bytes'] += packet_length
else:
stats[src_ip]['ports'][f"{protocol}"]['packets'] += 1 # alternative without splitting by port
stats[src_ip]['ports'][f"{protocol}"]['bytes'] += packet_length # alternative without splitting by port

stats[src_ip]['total_bytes'] += packet_length

# Update destination IP stats
if split_ports:
stats[dst_ip]['ports'][f"{dst_port}/{protocol}"]['packets'] += 1
stats[dst_ip]['ports'][f"{dst_port}/{protocol}"]['bytes'] += packet_length
else:
stats[dst_ip]['ports'][f"{protocol}"]['packets'] += 1 # alternative without splitting by port
stats[dst_ip]['ports'][f"{protocol}"]['bytes'] += packet_length # alternative without splitting by port

stats[dst_ip]['total_bytes'] += packet_length

return stats

def parse_ip_port(address):
try:
if ']' in address: # IPv6
ip, port = address.rsplit('.', 1)
ip = ip.strip('[]')
else: # IPv4
ip, port = address.rsplit('.', 1)

# Validate IP address
ipaddress.ip_address(ip)
return ip, int(port)
except ValueError:
return None, None

def generate_stats_string(stats, filter_host=False):
primary_interface = get_primary_interface()
ip_addresses = get_ip_addresses(primary_interface)

buffer = []
for ip, data in stats.items():
if filter_host and ip in ip_addresses:
continue

buffer.append(f"IP: {ip} (as sender or receiver. aggregated)")
buffer.append(f" Total transmitted data: {data['total_bytes']} bytes")
buffer.append(' Ports:')
for port, port_data in data['ports'].items():
buffer.append(f" {port}: {port_data['packets']} packets, {port_data['bytes']} bytes")
buffer.append('\n')

return '\n'.join(buffer)
25 changes: 25 additions & 0 deletions metric_providers/network/connections/tcpdump/system/tcpdump.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#! /bin/bash
set -euo pipefail

check_system=false
while getopts "c" o; do
case "$o" in
c)
check_system=true
;;
esac
done


if $check_system; then
# This will try to capture one packet only. However since no network traffic might be happening we also limit to 5 seconds
first_line=$(timeout 3 tcpdump -tt --micro -n -v -c 1)
# timeout will raise error code 124
if [ $? -eq 1 ]; then
echo "tcpdump could not be started. Missing sudo permissions?"
exit 1
fi
exit 0
fi

tcpdump -tt --micro -n -v
8 changes: 4 additions & 4 deletions runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1451,7 +1451,7 @@ def save_stdout_logs(self):
if logs_as_str:
DB().query("""
UPDATE runs
SET logs=%s
SET logs= COALESCE(logs, '') || %s -- append
WHERE id = %s
""", params=(logs_as_str, self._run_id))

Expand Down Expand Up @@ -1623,19 +1623,19 @@ def run(self):
raise exc
finally:
try:
self.read_and_cleanup_processes()
self.stop_metric_providers()
except BaseException as exc:
self.add_to_log(exc.__class__.__name__, str(exc))
raise exc
finally:
try:
self.save_notes_runner()
self.read_and_cleanup_processes()
except BaseException as exc:
self.add_to_log(exc.__class__.__name__, str(exc))
raise exc
finally:
try:
self.stop_metric_providers()
self.save_notes_runner()
except BaseException as exc:
self.add_to_log(exc.__class__.__name__, str(exc))
raise exc
Expand Down
2 changes: 1 addition & 1 deletion tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ def run_until(self, step):
self.__runner.update_start_and_end_times()
self.__runner.store_phases()
self.__runner.read_container_logs()
self.__runner.stop_metric_providers()
self.__runner.read_and_cleanup_processes()
self.__runner.save_notes_runner()
self.__runner.stop_metric_providers()
self.__runner.save_stdout_logs()

if self.__runner._dev_no_phase_stats is False:
Expand Down

0 comments on commit 573b1d6

Please sign in to comment.