thermalcam_decoder/decode.py

106 lines
3.0 KiB
Python
Raw Normal View History

2023-12-25 23:09:03 +02:00
# coding: utf-8
from os import system
import numpy as np
from tqdm import tqdm
import pandas as pd
import pcapng
2023-12-26 15:34:22 +02:00
from struct import unpack
2023-12-25 23:09:03 +02:00
from PIL import Image
import cv2
pd.options.display.max_colwidth = 150
scanner = pcapng.scanner.FileScanner(open('in.pcap', 'rb'))
blocks = list(tqdm(scanner))
def tryget(obj, att):
if hasattr(obj, att):
return getattr(obj, att)
return None
2023-12-26 15:34:22 +02:00
2023-12-25 23:09:03 +02:00
df = pd.DataFrame([{'index': i, 'length': tryget(obj, 'packet_len')} for i, obj in enumerate(blocks)])
# get udp packets
data = [blocks[i] for i in df[df.length == 6972.0].index]
# remove udp header
raw = [d.packet_data[0x2a:] for d in data]
def parse(data):
hdr = 4 + 2 * 7
c1, c2, part, a, ffaa, b, c, d = unpack('>Lhhhhhhh', data[:hdr])
ret = locals()
del ret['data']
del ret['hdr']
ret['data'] = data[hdr:]
return ret
2023-12-26 15:34:22 +02:00
2023-12-25 23:09:03 +02:00
df = pd.DataFrame([parse(d) for d in raw])
df2 = df[[c for c in df.columns if c != 'data']]
def getframes(df):
frames = []
current = []
for i, row in df.iterrows():
if row.part == 0:
# roll over
if len(current) > 0:
frames.append(current)
current = []
current.append(row['data'])
if len(current) > 0:
frames.append(current)
return [b''.join(parts) for parts in frames]
def image16(frame, width, height, pixelformat='>H'):
2023-12-26 15:34:22 +02:00
return [
Image.fromarray(np.frombuffer(frame, dtype=pixelformat).reshape(width, height)) for frame in frames
if len(frame) == 2 * width * height
]
2023-12-25 23:09:03 +02:00
def showvideo(images):
# wip
videodims = images[0].size
2023-12-26 15:34:22 +02:00
fourcc = cv2.VideoWriter_fourcc(*'avc1')
video = cv2.VideoWriter("test.mp4", fourcc, 60, videodims)
img = Image.new('RGB', videodims, color='darkred')
# draw stuff that goes on every frame here
for i in range(0, 60*60):
2023-12-25 23:09:03 +02:00
imtemp = img.copy()
# draw frame specific stuff here.
video.write(cv2.cvtColor(np.array(imtemp), cv2.COLOR_RGB2BGR))
video.release()
2023-12-26 15:34:22 +02:00
# def shape(frames, width, height, mode='RGB'):
# return [Image.frombytes('RGB', (width, height), frame) for frame in frames if len(frame) == width * height * 3]
2023-12-25 23:09:03 +02:00
frames = getframes(df)
images = image16(frames, 384, 288)
for i, img in enumerate(tqdm(images)):
img.save(f'{i:04}.png')
2023-12-26 15:34:22 +02:00
# frame = b''.join(df.iloc[:32]['data'])
# Image.frombytes('RGB', (288, 256), frame).show()
start_len = len(b'T=(-1.665884e-08)*X^4+(1.347094e-05)*X^3+(-4.396264e-03)*X^2+(9.506939e-01)*X+(-6.353247e+01)\r\n')
2023-12-25 23:09:03 +02:00
2023-12-26 15:38:54 +02:00
# 250 bytes at start of frame
2023-12-25 23:09:03 +02:00
equations = {x[:250].decode().strip() for x in df[df.part == 0]['data']}
# seen only a single equation on all packets
assert len(equations) == 1
assert list(equations)[0] == 'T=(-1.665884e-08)*X^4+(1.347094e-05)*X^3+(-4.396264e-03)*X^2+(9.506939e-01)*X+(-6.353247e+01)'
# we have 6372 packets
assert df.shape[0] == 6372
# produce a video
2023-12-26 15:38:54 +02:00
system('ffmpeg -f image2 -framerate 25 -i %04d.png -vf "transpose=1" -s 384x288 -vcodec libx264 -pix_fmt yuv420p thermal.mp4')
2023-12-25 23:55:12 +02:00
print('to play: ffplay thermal.mp4')