thermalcam_decoder/decode.py

219 lines
5.7 KiB
Python

#!/usr/bin/env python3
import argparse
from pathlib import Path
import os
import subprocess
from io import BytesIO
import numpy as np
from tqdm import tqdm
from datetime import datetime
import pandas as pd
import pcapng
from struct import unpack
from PIL import Image
# Create the parser
parser = argparse.ArgumentParser(description="Process a pcap file.")
parser.add_argument("--live", action="store_true", help="Process images live")
# Add an argument for the pcap file, with a default value
parser.add_argument('input_file', nargs='?', default='in.pcap', help='The pcap file to process')
# Parse the arguments
args = parser.parse_args()
# TODO - probably a better way to do this
def live_capture_cb(cb):
def outer(pkt):
data = bytes(pkt)
l = len(data)
if l == 6972:
cb(data)
scapy.all.sniff(iface="enp1s0f0", filter='udp', prn=outer)
def rightsize(it):
for i, obj in enumerate(it):
if isinstance(obj, bytes):
l = len(obj)
data = obj
else:
if not hasattr(obj, 'packet_len'):
continue
l = obj.packet_len
data = obj.packet_data
if l != 6972:
continue
yield data
def removestart(it):
"Remove the UDP header from the packets"
for x in it:
yield removestart_inner(x)
def removestart_inner(x):
return x[0x2A:]
# Function to parse packet data
def parse(data):
hdr = 4 + 2 * 7 # Header length
# Unpack data into variables
c1, c2, part, a, ffaa, b, c, d = unpack(">Lhhhhhhh", data[:hdr])
ret = locals()
del ret["data"]
del ret["hdr"]
ret["data"] = data[hdr:]
return ret
def parsed(it):
for x in it:
yield parse(x)
class FrameCollector:
def __init__(self):
self.current = []
def handle(self, obj):
ret = None
if obj['part'] == 0:
if len(self.current) > 0:
ret = b"".join(self.current)
self.current = []
#otherdata = []
self.current.append(obj["data"])
return ret
#otherdata.append(obj)
def last(self):
if len(self.current) > 0:
return b"".join(current)
return None
# Function to group data into frames
def frames(it):
handler = FrameCollector()
#otherdata = []
for obj in it:
ret = handler.handle(obj)
if ret:
yield ret
last = handler.last()
if last:
yield last
WIDTH = 384
HEIGHT = 288
def bad_frame(frame, width=WIDTH, height=HEIGHT):
return len(frame) != width * height * 2 # 16 bpp
def skip_bad_frames(it, width=WIDTH, height=HEIGHT):
for frame in it:
if bad_frame(frame): # 16 bpp
# Will be fixed when we stopped doing restarts
#print(f'{len(frame)} != {width} * {height} * 2')
continue
yield frame
def iterimages(it, width=WIDTH, height=HEIGHT, pixelformat=">H"):
for frame in it:
yield Image.fromarray(np.frombuffer(frame, dtype=pixelformat).reshape(width, height))
def process_video():
# Now use args.input_file as the file to process
input_file = args.input_file
basename = os.path.splitext(os.path.basename(input_file))[0]
stream = open(input_file, 'rb')
# Read packets from a pcap file
scanner = pcapng.scanner.FileScanner(stream)
blocks = tqdm(scanner)
# Get frames and convert them to images
frames = skip_bad_frames(frames(parsed(removestart(rightsize(blocks)))))
# Create the directory for frames if not exists
frame_dir = f"frames/{basename}"
if not os.path.exists(frame_dir):
os.makedirs(frame_dir)
# Save each image as a PNG file
images = iterimages(it=frames)
for i, img in enumerate(images):
img.save(f'frames/{basename}/{basename}_{i:04}.png')
ffmpeg_input = f"frames/{basename}/{basename}_%04d.png"
command = [
"ffmpeg",
"-y", # Overwrite output file without asking
"-hide_banner", # Hide banner
"-loglevel", "info", # Log level
"-f", "image2", # Input format
"-framerate", "25", # Framerate
"-i", ffmpeg_input, # Input file pattern
"-vf", "transpose=1", # Video filter for transposing
"-s", "384x288", # Size of one frame
"-vcodec", "libopenh264", # Video codec
"-pix_fmt", "yuv420p", # Pixel format: YUV 4:2:0
"thermal.mp4", # Output file in MP4 container
]
subprocess.run(command)
print("to play: ffplay thermal.mp4")
if args.live:
# TODO: to video via ffmpeg; right now just a single png
# of the last frame
def todo_live_ffmpeg():
output = 'to_ffmpeg'
# live: write to named pipe
if not Path(output).exists():
print(f'making fifo at {output}')
os.mkfifo(output)
fd = open(output, 'wb')
for frame in frames:
fd.write(frame)
print('live stream, import scapy')
import scapy.all
print('open stream')
class PacketHandler:
def __init__(self, cb):
self.frame_collector = FrameCollector()
self.cb = cb
def handle(self, pkt):
pkt = removestart_inner(pkt)
parsed = parse(pkt)
frame_maybe = self.frame_collector.handle(parsed)
if not frame_maybe or bad_frame(frame_maybe):
return
self.cb(frame_maybe)
progress = tqdm()
def on_frame(frame):
progress.update(1)
Image.fromarray(np.frombuffer(frame, dtype='>H').reshape(WIDTH, HEIGHT)).save(f'live.new.png')
os.rename('live.new.png', 'live.png')
handler = PacketHandler(on_frame)
live_capture_cb(handler.handle)
else:
process_video()