import argparse import os import subprocess import numpy as np from tqdm import tqdm import pandas as pd import pcapng from struct import unpack from PIL import Image # Create the parser parser = argparse.ArgumentParser(description="Process a pcap file.") # Add an argument for the pcap file, with a default value parser.add_argument('input_file', nargs='?', default='in.pcap', help='The pcap file to process') # Parse the arguments args = parser.parse_args() # Now use args.input_file as the file to process input_file = args.input_file basename = os.path.splitext(os.path.basename(input_file))[0] # Read packets from a pcap file scanner = pcapng.scanner.FileScanner(open(input_file, "rb")) blocks = list(tqdm(scanner)) # Helper function to safely get an attribute from an object def tryget(obj, att): if hasattr(obj, att): return getattr(obj, att) return None # Create a DataFrame with packet lengths df = pd.DataFrame( [{"index": i, "length": tryget(obj, "packet_len")} for i, obj in enumerate(blocks)] ) # Filter and extract data packets of a specific length data = [blocks[i] for i in df[df.length == 6972.0].index] # Remove the UDP header from the packets raw = [d.packet_data[0x2A:] for d in data] # Function to parse packet data def parse(data): hdr = 4 + 2 * 7 # Header length # Unpack data into variables c1, c2, part, a, ffaa, b, c, d = unpack(">Lhhhhhhh", data[:hdr]) ret = locals() del ret["data"] del ret["hdr"] ret["data"] = data[hdr:] return ret # Parse each packet and create a DataFrame df = pd.DataFrame([parse(d) for d in raw]) df2 = df[[c for c in df.columns if c != "data"]] # Function to group data into frames def getframes(df): frames = [] current = [] for i, row in df.iterrows(): if row.part == 0: if len(current) > 0: frames.append(current) current = [] current.append(row["data"]) if len(current) > 0: frames.append(current) return [b"".join(parts) for parts in frames] # Function to convert binary frame data into images def image16(frame, width, height, pixelformat=">H"): return [ Image.fromarray(np.frombuffer(frame, dtype=pixelformat).reshape(width, height)) for frame in frames if len(frame) == 2 * width * height ] # Get frames and convert them to images frames = getframes(df) images = image16(frames, 384, 288) # Create the directory for frames if not exists frame_dir = f"frames/{basename}" if not os.path.exists(frame_dir): os.makedirs(frame_dir) # Save each image as a PNG file for i, img in enumerate(tqdm(images)): img.save(f'frames/{basename}/{basename}_{i:04}.png') # Produce a video from the saved images ffmpeg_input = f"frames/{basename}/{basename}_%04d.png" command = [ "ffmpeg", "-y", # Overwrite output file without asking "-hide_banner", # Hide banner "-loglevel", "info", # Log level "-f", "image2", # Input format "-framerate", "25", # Framerate "-i", ffmpeg_input, # Input file pattern "-vf", "transpose=1", # Video filter for transposing "-s", "384x288", # Size of one frame "-vcodec", "libx264", # Video codec "-pix_fmt", "yuv420p", # Pixel format: YUV 4:2:0 "thermal.mp4", # Output file in MP4 container ] subprocess.run(command) print("to play: ffplay thermal.mp4")