thermalcam_decoder/decode.py

219 lines
5.7 KiB
Python
Raw Normal View History

2023-12-31 13:40:47 +02:00
#!/usr/bin/env python3
2023-12-29 02:26:24 +02:00
import argparse
from pathlib import Path
2023-12-29 02:26:24 +02:00
import os
import subprocess
from io import BytesIO
2023-12-25 23:09:03 +02:00
import numpy as np
from tqdm import tqdm
from datetime import datetime
2023-12-25 23:09:03 +02:00
import pandas as pd
import pcapng
2023-12-26 15:34:22 +02:00
from struct import unpack
2023-12-25 23:09:03 +02:00
from PIL import Image
2023-12-29 02:26:24 +02:00
# Create the parser
parser = argparse.ArgumentParser(description="Process a pcap file.")
parser.add_argument("--live", action="store_true", help="Process images live")
2023-12-29 02:26:24 +02:00
# Add an argument for the pcap file, with a default value
parser.add_argument('input_file', nargs='?', default='in.pcap', help='The pcap file to process')
# Parse the arguments
args = parser.parse_args()
2023-12-26 15:34:22 +02:00
# TODO - probably a better way to do this
def live_capture_cb(cb):
def outer(pkt):
data = bytes(pkt)
l = len(data)
if l == 6972:
cb(data)
scapy.all.sniff(iface="enp1s0f0", filter='udp', prn=outer)
2023-12-25 23:09:03 +02:00
2023-12-26 16:35:32 +02:00
2023-12-31 13:40:47 +02:00
def rightsize(it):
for i, obj in enumerate(it):
if isinstance(obj, bytes):
l = len(obj)
data = obj
else:
if not hasattr(obj, 'packet_len'):
continue
l = obj.packet_len
data = obj.packet_data
if l != 6972:
2023-12-31 13:40:47 +02:00
continue
yield data
2023-12-26 16:35:32 +02:00
2023-12-31 13:40:47 +02:00
def removestart(it):
"Remove the UDP header from the packets"
for x in it:
yield removestart_inner(x)
def removestart_inner(x):
return x[0x2A:]
2023-12-31 13:40:47 +02:00
2023-12-25 23:09:03 +02:00
2023-12-26 16:35:32 +02:00
# Function to parse packet data
2023-12-25 23:09:03 +02:00
def parse(data):
2023-12-26 16:35:32 +02:00
hdr = 4 + 2 * 7 # Header length
# Unpack data into variables
2023-12-29 02:26:24 +02:00
c1, c2, part, a, ffaa, b, c, d = unpack(">Lhhhhhhh", data[:hdr])
2023-12-25 23:09:03 +02:00
ret = locals()
2023-12-29 02:26:24 +02:00
del ret["data"]
del ret["hdr"]
ret["data"] = data[hdr:]
2023-12-25 23:09:03 +02:00
return ret
2023-12-26 15:34:22 +02:00
2023-12-31 13:40:47 +02:00
def parsed(it):
for x in it:
yield parse(x)
2023-12-25 23:09:03 +02:00
class FrameCollector:
def __init__(self):
self.current = []
def handle(self, obj):
ret = None
if obj['part'] == 0:
if len(self.current) > 0:
ret = b"".join(self.current)
self.current = []
#otherdata = []
self.current.append(obj["data"])
return ret
#otherdata.append(obj)
def last(self):
if len(self.current) > 0:
return b"".join(current)
return None
2023-12-26 16:35:32 +02:00
# Function to group data into frames
2023-12-31 13:40:47 +02:00
def frames(it):
handler = FrameCollector()
#otherdata = []
2023-12-31 13:40:47 +02:00
for obj in it:
ret = handler.handle(obj)
if ret:
yield ret
last = handler.last()
if last:
yield last
2023-12-25 23:09:03 +02:00
WIDTH = 384
HEIGHT = 288
2023-12-25 23:09:03 +02:00
def bad_frame(frame, width=WIDTH, height=HEIGHT):
return len(frame) != width * height * 2 # 16 bpp
def skip_bad_frames(it, width=WIDTH, height=HEIGHT):
2023-12-31 13:40:47 +02:00
for frame in it:
if bad_frame(frame): # 16 bpp
# Will be fixed when we stopped doing restarts
#print(f'{len(frame)} != {width} * {height} * 2')
2023-12-31 13:40:47 +02:00
continue
yield frame
def iterimages(it, width=WIDTH, height=HEIGHT, pixelformat=">H"):
for frame in it:
2023-12-31 13:40:47 +02:00
yield Image.fromarray(np.frombuffer(frame, dtype=pixelformat).reshape(width, height))
2023-12-25 23:09:03 +02:00
def process_video():
# Now use args.input_file as the file to process
input_file = args.input_file
basename = os.path.splitext(os.path.basename(input_file))[0]
stream = open(input_file, 'rb')
# Read packets from a pcap file
scanner = pcapng.scanner.FileScanner(stream)
blocks = tqdm(scanner)
# Get frames and convert them to images
frames = skip_bad_frames(frames(parsed(removestart(rightsize(blocks)))))
2023-12-26 16:35:32 +02:00
# Create the directory for frames if not exists
frame_dir = f"frames/{basename}"
if not os.path.exists(frame_dir):
os.makedirs(frame_dir)
2023-12-29 02:26:24 +02:00
# Save each image as a PNG file
images = iterimages(it=frames)
for i, img in enumerate(images):
img.save(f'frames/{basename}/{basename}_{i:04}.png')
ffmpeg_input = f"frames/{basename}/{basename}_%04d.png"
command = [
"ffmpeg",
"-y", # Overwrite output file without asking
"-hide_banner", # Hide banner
"-loglevel", "info", # Log level
"-f", "image2", # Input format
"-framerate", "25", # Framerate
"-i", ffmpeg_input, # Input file pattern
"-vf", "transpose=1", # Video filter for transposing
"-s", "384x288", # Size of one frame
"-vcodec", "libopenh264", # Video codec
"-pix_fmt", "yuv420p", # Pixel format: YUV 4:2:0
"thermal.mp4", # Output file in MP4 container
]
subprocess.run(command)
print("to play: ffplay thermal.mp4")
if args.live:
# TODO: to video via ffmpeg; right now just a single png
# of the last frame
def todo_live_ffmpeg():
output = 'to_ffmpeg'
# live: write to named pipe
if not Path(output).exists():
print(f'making fifo at {output}')
os.mkfifo(output)
fd = open(output, 'wb')
for frame in frames:
fd.write(frame)
print('live stream, import scapy')
import scapy.all
print('open stream')
class PacketHandler:
def __init__(self, cb):
self.frame_collector = FrameCollector()
self.cb = cb
def handle(self, pkt):
pkt = removestart_inner(pkt)
parsed = parse(pkt)
frame_maybe = self.frame_collector.handle(parsed)
if not frame_maybe or bad_frame(frame_maybe):
return
self.cb(frame_maybe)
progress = tqdm()
def on_frame(frame):
progress.update(1)
2024-02-15 00:30:29 +02:00
Image.fromarray(np.frombuffer(frame, dtype='>H').reshape(WIDTH, HEIGHT)).save(f'live.new.png')
os.rename('live.new.png', 'live.png')
handler = PacketHandler(on_frame)
live_capture_cb(handler.handle)
else:
process_video()