thermalcam_decoder/decode.py
yair-mantis e15ec35ddd viewer
2023-12-29 02:26:24 +02:00

124 lines
3.4 KiB
Python

import argparse
import os
import subprocess
import numpy as np
from tqdm import tqdm
import pandas as pd
import pcapng
from struct import unpack
from PIL import Image
# Create the parser
parser = argparse.ArgumentParser(description="Process a pcap file.")
# Add an argument for the pcap file, with a default value
parser.add_argument('input_file', nargs='?', default='in.pcap', help='The pcap file to process')
# Parse the arguments
args = parser.parse_args()
# Now use args.input_file as the file to process
input_file = args.input_file
basename = os.path.splitext(os.path.basename(input_file))[0]
# Read packets from a pcap file
scanner = pcapng.scanner.FileScanner(open(input_file, "rb"))
blocks = list(tqdm(scanner))
# Helper function to safely get an attribute from an object
def tryget(obj, att):
if hasattr(obj, att):
return getattr(obj, att)
return None
# Create a DataFrame with packet lengths
df = pd.DataFrame(
[{"index": i, "length": tryget(obj, "packet_len")} for i, obj in enumerate(blocks)]
)
# Filter and extract data packets of a specific length
data = [blocks[i] for i in df[df.length == 6972.0].index]
# Remove the UDP header from the packets
raw = [d.packet_data[0x2A:] for d in data]
# Function to parse packet data
def parse(data):
hdr = 4 + 2 * 7 # Header length
# Unpack data into variables
c1, c2, part, a, ffaa, b, c, d = unpack(">Lhhhhhhh", data[:hdr])
ret = locals()
del ret["data"]
del ret["hdr"]
ret["data"] = data[hdr:]
return ret
# Parse each packet and create a DataFrame
df = pd.DataFrame([parse(d) for d in raw])
df2 = df[[c for c in df.columns if c != "data"]]
# Function to group data into frames
def getframes(df):
frames = []
current = []
for i, row in df.iterrows():
if row.part == 0:
if len(current) > 0:
frames.append(current)
current = []
current.append(row["data"])
if len(current) > 0:
frames.append(current)
return [b"".join(parts) for parts in frames]
# Function to convert binary frame data into images
def image16(frame, width, height, pixelformat=">H"):
return [
Image.fromarray(np.frombuffer(frame, dtype=pixelformat).reshape(width, height))
for frame in frames
if len(frame) == 2 * width * height
]
# Get frames and convert them to images
frames = getframes(df)
images = image16(frames, 384, 288)
# Create the directory for frames if not exists
frame_dir = f"frames/{basename}"
if not os.path.exists(frame_dir):
os.makedirs(frame_dir)
# Save each image as a PNG file
for i, img in enumerate(tqdm(images)):
img.save(f'frames/{basename}/{basename}_{i:04}.png')
# Produce a video from the saved images
ffmpeg_input = f"frames/{basename}/{basename}_%04d.png"
command = [
"ffmpeg",
"-y", # Overwrite output file without asking
"-hide_banner", # Hide banner
"-loglevel", "info", # Log level
"-f", "image2", # Input format
"-framerate", "25", # Framerate
"-i", ffmpeg_input, # Input file pattern
"-vf", "transpose=1", # Video filter for transposing
"-s", "384x288", # Size of one frame
"-vcodec", "libx264", # Video codec
"-pix_fmt", "yuv420p", # Pixel format: YUV 4:2:0
"thermal.mp4", # Output file in MP4 container
]
subprocess.run(command)
print("to play: ffplay thermal.mp4")