Interactive python script for network latency calculation & jitter analysis.

1 – Overview:

Analyze latency, jitter, and anomalies directly from Wireshark CSV captures.

✅ Supports both IP and L2 packets

✅ Interactive menu-driven interface

✅ Directional (RFC-compliant) & bidirectional analysis

2 – Minimum Requirements:

Python 3.x

Wireshark CSV export with headers: Protocol, Source, Destination, Delta time

Terminal/console for interactive prompts

3 – Workflow Diagram:

Start

Load CSV file

List protocols → Select protocol

List SRC→DST pairs → Select pair or all pairs

Choose analysis type:

├─ Direction-based (SRC→DST) → RFC-compliant

└─ Bidirectional (SRC↔DST) → Flow-based

Calculate:

- Mean Latency

- Jitter (Standard Deviation -mathematical)

- Jitter (RFC method for RTP)

Detect anomalies (> mean + 2*SD)

Print results & optionally repeat

End

4 – How it Works:

Extracts latencies from CSV per protocol/IP pair

Calculates mean, jitter (SD & RFC), and anomalies

Interactive menus ensure valid numeric and y/n inputs

Works for both IP packets and L2 headers

Optionally combines traffic in both directions

5 – Benefits:

Quick automated analysis of network captures

Useful for Network Admin and Network Engineers.


import csv
import statistics

def analyze_latency(latencies):
    mean_latency = statistics.mean(latencies)
    std_dev = statistics.pstdev(latencies)
    diffs = [abs(latencies[i+1] - latencies[i]) for i in range(len(latencies)-1)]
    rfc_jitter = sum(diffs) / len(diffs) if diffs else 0
    return mean_latency, std_dev, rfc_jitter

def detect_anomalies(latencies, mean, std_dev, threshold_multiplier=2):
    threshold = mean + threshold_multiplier * std_dev
    anomalies = [lat for lat in latencies if lat > threshold]
    return anomalies

def read_csv(filename):
    data = []
    with open(filename, newline='') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            try:
                protocol = row.get("Protocol", "").strip()
                src = row.get("Source", "").strip()
                dst = row.get("Destination", "").strip()
                delta = float(row.get("Delta time", row.get("Delta time displayed", 0)))
                data.append((protocol, src, dst, delta))
            except ValueError:
                continue
    return data

def input_number(prompt, min_val=None, max_val=None):
    while True:
        choice = input(prompt)
        if not choice.isdigit():
            print("Invalid input. Please enter a number.")
            continue
        choice = int(choice)
        if (min_val is not None and choice < min_val) or (max_val is not None and choice > max_val):
            print(f"Please enter a number between {min_val} and {max_val}.")
            continue
        return choice

def input_yes_no(prompt):
    while True:
        choice = input(prompt).strip().lower()
        if choice in ['y', 'n']:
            return choice
        else:
            print("Invalid input. Press either 'y' or 'n'.")

def calculate_for_pair(data, protocol, src, dst, bidirectional=False):
    if bidirectional:
        latencies = [d[3]*1000 for d in data if d[0] == protocol and ((d[1], d[2]) == (src, dst) or (d[1], d[2]) == (dst, src))]
    else:
        latencies = [d[3]*1000 for d in data if d[0] == protocol and (d[1], d[2]) == (src, dst)]

    if not latencies:
        print(f"No latency values found for {src} ↔ {dst} in protocol {protocol}.")
        return

    mean, jitter_sd, jitter_rfc = analyze_latency(latencies)
    anomalies = detect_anomalies(latencies, mean, jitter_sd)

    direction_str = "Bidirectional" if bidirectional else "Direction-based (SRC→DST)"
    print("\n--- Latency & Jitter Results ---")
    print(f"Protocol: {protocol}")
    print(f"SRC ↔ DST (L2 header): {src or 'N/A'} ↔ {dst or 'N/A'}")
    print(f"Mode: {direction_str}")
    print(f"Number of packets: {len(latencies)}")
    print(f"Mean Latency: {mean:.2f} ms")
    print(f"Jitter (Standard Deviation): {jitter_sd:.2f} ms")
    print(f"Jitter (RFC method): {jitter_rfc:.2f} ms")

    if anomalies:
        print(f"\n⚠️ Anomalies detected ({len(anomalies)} packets exceed mean + 2*SD):")
        print(", ".join(f"{a:.2f} ms" for a in anomalies))
    else:
        print("\nNo anomalies detected for this protocol and IP/L2 pair.")

def select_ip_pair(data, protocol):
    pairs = sorted(set((d[1], d[2]) for d in data if d[0] == protocol))
    if not pairs:
        print(f"No packets found for protocol {protocol}.")
        return

    while True:
        print(f"\nAvailable SRC → DST pairs for protocol {protocol}:")
        for i, (src, dst) in enumerate(pairs, 1):
            header_type = "IP" if src and dst else "Non-IP"
            print(f"{i}. {src or 'N/A'} → {dst or 'N/A'} ({header_type})")
        print(f"{len(pairs)+1}. All pairs for this protocol")

        choice = input_number("Select a pair number: ", 1, len(pairs)+1)
        bidirectional = input_yes_no("Do you want to analyze bidirectional flow? (y/n): ") == 'y'

        if choice == len(pairs)+1:
            for src, dst in pairs:
                if not src or not dst:
                    proceed = input_yes_no("This is not an IP header. Do you still want to analyze jitter? (y/n): ")
                    if proceed == 'n':
                        continue
                calculate_for_pair(data, protocol, src, dst, bidirectional)
        else:
            src, dst = pairs[choice-1]
            if not src or not dst:
                proceed = input_yes_no("This is not an IP header. Do you still want to analyze jitter? (y/n): ")
                if proceed == 'n':
                    continue
            calculate_for_pair(data, protocol, src, dst, bidirectional)

        more = input_yes_no("\nDo you want to check another IP/L2 pair for this protocol? (y/n): ")
        if more == 'n':
            break

def main():
    filename = "test2-jitter.csv"  # Change to your CSV filename
    data = read_csv(filename)

    if not data:
        print("No data found. Check CSV headers and file content.")
        return

    while True:
        protocols = sorted(set(d[0] for d in data if d[0]))
        print("\nAvailable Protocols in CSV:")
        for i, proto in enumerate(protocols, 1):
            print(f"{i}. {proto}")

        choice = input_number("Select a protocol number: ", 1, len(protocols))
        selected_proto = protocols[choice-1]

        select_ip_pair(data, selected_proto)

        more_proto = input_yes_no("\nDo you want to check another protocol? (y/n): ")
        if more_proto == 'n':
            print("Exiting program.")
            break

if __name__ == "__main__":
    main()

        

Output sample is given below:

Available Protocols in CSV:

1. ARP

2. BROWSER

3. CDP

4. CLDAP

5. DCERPC

6. DHCP

7. DNS

8. DRSUAPI

9. EAP

10. EAPOL

11. EPM

12. HTTP

13. ICMP

14. IGMPv2

15. IGMPv3

Select a protocol number: 7

Available SRC → DST pairs for protocol DNS:

1. 10.41.105.71 → 10.47.176.21 (IP)

2. 10.41.105.73 → 10.47.176.21 (IP)

3. 10.41.105.73 → 10.47.176.22 (IP)

4. 10.41.205.3 → 10.47.176.21 (IP)

5. 10.41.205.3 → 10.47.176.22 (IP)

6. 10.47.176.21 → 10.41.105.71 (IP)

7. 10.47.176.21 → 10.41.205.3 (IP)

8. 10.47.176.22 → 10.41.205.3 (IP)

9. All pairs for this protocol

Select a pair number: 9

Do you want to analyze bidirectional flow? (y/n): y

--- Latency & Jitter Results ---

Protocol: DNS

SRC ↔ DST (L2 header): 10.41.105.71 ↔ 10.47.176.21

Mode: Bidirectional

Number of packets: 4

Mean Latency: 0.00 ms

Jitter (Standard Deviation): 0.00 ms

Jitter (RFC method): 0.00 ms

No anomalies detected for this protocol and IP/L2 pair.

--- Latency & Jitter Results ---

Protocol: DNS

SRC ↔ DST (L2 header): 10.41.105.73 ↔ 10.47.176.21

Mode: Bidirectional

Number of packets: 65

Mean Latency: 0.00 ms

Jitter (Standard Deviation): 0.00 ms

Jitter (RFC method): 0.00 ms

No anomalies detected for this protocol and IP/L2 pair.

--- Latency & Jitter Results ---

Protocol: DNS

SRC ↔ DST (L2 header): 10.41.105.73 ↔ 10.47.176.22

Mode: Bidirectional

Number of packets: 88

Mean Latency: 0.00 ms

Jitter (Standard Deviation): 0.00 ms

Jitter (RFC method): 0.00 ms

No anomalies detected for this protocol and IP/L2 pair.

--- Latency & Jitter Results ---

Protocol: DNS

SRC ↔ DST (L2 header): 10.41.205.3 ↔ 10.47.176.21

Mode: Bidirectional

Number of packets: 108

Mean Latency: 0.00 ms

Jitter (Standard Deviation): 0.00 ms

Jitter (RFC method): 0.00 ms

No anomalies detected for this protocol and IP/L2 pair.

--- Latency & Jitter Results ---

Protocol: DNS

SRC ↔ DST (L2 header): 10.41.205.3 ↔ 10.47.176.22

Mode: Bidirectional

Number of packets: 12

Mean Latency: 0.00 ms

Jitter (Standard Deviation): 0.00 ms

Jitter (RFC method): 0.00 ms

No anomalies detected for this protocol and IP/L2 pair.

--- Latency & Jitter Results ---

Protocol: DNS

SRC ↔ DST (L2 header): 10.47.176.21 ↔ 10.41.105.71

Mode: Bidirectional

Number of packets: 4

Mean Latency: 0.00 ms

Jitter (Standard Deviation): 0.00 ms

Jitter (RFC method): 0.00 ms

No anomalies detected for this protocol and IP/L2 pair.

--- Latency & Jitter Results ---

Protocol: DNS

SRC ↔ DST (L2 header): 10.47.176.21 ↔ 10.41.205.3

Mode: Bidirectional

Number of packets: 108

Mean Latency: 0.00 ms

Jitter (Standard Deviation): 0.00 ms

Jitter (RFC method): 0.00 ms

No anomalies detected for this protocol and IP/L2 pair.

--- Latency & Jitter Results ---

Protocol: DNS

SRC ↔ DST (L2 header): 10.47.176.22 ↔ 10.41.205.3

Mode: Bidirectional

Number of packets: 12

Mean Latency: 0.00 ms

Jitter (Standard Deviation): 0.00 ms

Jitter (RFC method): 0.00 ms

No anomalies detected for this protocol and IP/L2 pair.

Do you want to check another IP/L2 pair for this protocol? (y/n): n

Do you want to check another protocol? (y/n): n

Exiting program.        

To view or add a comment, sign in

More articles by Kanagavalli Rajesh Kumar

Explore content categories