#!/usr/bin/env python3 import argparse import os import subprocess import numpy as np from tqdm import tqdm import pandas as pd import pcapng from struct import unpack from PIL import Image # Create the parser parser = argparse.ArgumentParser(description="Process a pcap file.") # Add an argument for the pcap file, with a default value parser.add_argument('input_file', nargs='?', default='in.pcap', help='The pcap file to process') # Parse the arguments args = parser.parse_args() # Now use args.input_file as the file to process input_file = args.input_file basename = os.path.splitext(os.path.basename(input_file))[0] # Read packets from a pcap file scanner = pcapng.scanner.FileScanner(open(input_file, "rb")) blocks = tqdm(scanner) # Helper function to safely get an attribute from an object def tryget(obj, att): if hasattr(obj, att): return getattr(obj, att) return None def rightsize(it): for i, obj in enumerate(it): if not hasattr(obj, 'packet_len'): continue len = obj.packet_len if len != 6972: continue yield obj.packet_data def removestart(it): "Remove the UDP header from the packets" for x in it: yield x[0x2A:] # Function to parse packet data def parse(data): hdr = 4 + 2 * 7 # Header length # Unpack data into variables c1, c2, part, a, ffaa, b, c, d = unpack(">Lhhhhhhh", data[:hdr]) ret = locals() del ret["data"] del ret["hdr"] ret["data"] = data[hdr:] return ret def parsed(it): for x in it: yield parse(x) # Function to group data into frames def frames(it): current = [] for obj in it: if obj['part'] == 0: if len(current) > 0: yield b"".join(current) current = [] current.append(obj["data"]) if len(current) > 0: yield b"".join(current) def iterimages(it, width, height, pixelformat=">H"): for frame in it: if len(frame) != width * height * 2: # 16 bpp continue yield Image.fromarray(np.frombuffer(frame, dtype=pixelformat).reshape(width, height)) # Get frames and convert them to images frames = frames(parsed(removestart(rightsize(blocks)))) images = iterimages(it=frames, width=384, height=288) # Create the directory for frames if not exists frame_dir = f"frames/{basename}" if not os.path.exists(frame_dir): os.makedirs(frame_dir) # Save each image as a PNG file for i, img in enumerate(images): img.save(f'frames/{basename}/{basename}_{i:04}.png') # Produce a video from the saved images ffmpeg_input = f"frames/{basename}/{basename}_%04d.png" command = [ "ffmpeg", "-y", # Overwrite output file without asking "-hide_banner", # Hide banner "-loglevel", "info", # Log level "-f", "image2", # Input format "-framerate", "25", # Framerate "-i", ffmpeg_input, # Input file pattern "-vf", "transpose=1", # Video filter for transposing "-s", "384x288", # Size of one frame "-vcodec", "libx264", # Video codec "-pix_fmt", "yuv420p", # Pixel format: YUV 4:2:0 "thermal.mp4", # Output file in MP4 container ] subprocess.run(command) print("to play: ffplay thermal.mp4")