decode.py: rewrite to use iterators

This commit is contained in:
Alon Levy 2023-12-31 13:40:47 +02:00
parent 312637fa72
commit 6725a9af63

62
decode.py Normal file → Executable file
View File

@ -1,3 +1,4 @@
#!/usr/bin/env python3
import argparse import argparse
import os import os
import subprocess import subprocess
@ -25,7 +26,7 @@ basename = os.path.splitext(os.path.basename(input_file))[0]
# Read packets from a pcap file # Read packets from a pcap file
scanner = pcapng.scanner.FileScanner(open(input_file, "rb")) scanner = pcapng.scanner.FileScanner(open(input_file, "rb"))
blocks = list(tqdm(scanner)) blocks = tqdm(scanner)
# Helper function to safely get an attribute from an object # Helper function to safely get an attribute from an object
def tryget(obj, att): def tryget(obj, att):
@ -34,17 +35,22 @@ def tryget(obj, att):
return None return None
# Create a DataFrame with packet lengths
df = pd.DataFrame( def rightsize(it):
[{"index": i, "length": tryget(obj, "packet_len")} for i, obj in enumerate(blocks)] for i, obj in enumerate(it):
) if not hasattr(obj, 'packet_len'):
continue
len = obj.packet_len
if len != 6972:
continue
yield obj.packet_data
# Filter and extract data packets of a specific length def removestart(it):
data = [blocks[i] for i in df[df.length == 6972.0].index] "Remove the UDP header from the packets"
for x in it:
yield x[0x2A:]
# Remove the UDP header from the packets
raw = [d.packet_data[0x2A:] for d in data]
# Function to parse packet data # Function to parse packet data
@ -59,38 +65,34 @@ def parse(data):
return ret return ret
# Parse each packet and create a DataFrame def parsed(it):
df = pd.DataFrame([parse(d) for d in raw]) for x in it:
df2 = df[[c for c in df.columns if c != "data"]] yield parse(x)
# Function to group data into frames # Function to group data into frames
def getframes(df): def frames(it):
frames = []
current = [] current = []
for i, row in df.iterrows(): for obj in it:
if row.part == 0: if obj['part'] == 0:
if len(current) > 0: if len(current) > 0:
frames.append(current) yield b"".join(current)
current = [] current = []
current.append(row["data"]) current.append(obj["data"])
if len(current) > 0: if len(current) > 0:
frames.append(current) yield b"".join(current)
return [b"".join(parts) for parts in frames]
# Function to convert binary frame data into images def iterimages(it, width, height, pixelformat=">H"):
def image16(frame, width, height, pixelformat=">H"): for frame in it:
return [ if len(frame) != width * height * 2: # 16 bpp
Image.fromarray(np.frombuffer(frame, dtype=pixelformat).reshape(width, height)) continue
for frame in frames yield Image.fromarray(np.frombuffer(frame, dtype=pixelformat).reshape(width, height))
if len(frame) == 2 * width * height
]
# Get frames and convert them to images # Get frames and convert them to images
frames = getframes(df) frames = frames(parsed(removestart(rightsize(blocks))))
images = image16(frames, 384, 288) images = iterimages(it=frames, width=384, height=288)
# Create the directory for frames if not exists # Create the directory for frames if not exists
frame_dir = f"frames/{basename}" frame_dir = f"frames/{basename}"
@ -98,7 +100,7 @@ if not os.path.exists(frame_dir):
os.makedirs(frame_dir) os.makedirs(frame_dir)
# Save each image as a PNG file # Save each image as a PNG file
for i, img in enumerate(tqdm(images)): for i, img in enumerate(images):
img.save(f'frames/{basename}/{basename}_{i:04}.png') img.save(f'frames/{basename}/{basename}_{i:04}.png')
# Produce a video from the saved images # Produce a video from the saved images