thermalcam_decoder/decode.py

104 lines
3.1 KiB
Python
Raw Normal View History

2023-12-25 23:09:03 +02:00
# coding: utf-8
from os import system
import numpy as np
from tqdm import tqdm
import pandas as pd
import pcapng
from struct import unpack
import matplotlib.pyplot as plt
from PIL import Image
from PIL import Image, ImageDraw
import cv2
pd.options.display.max_colwidth = 150
scanner = pcapng.scanner.FileScanner(open('in.pcap', 'rb'))
blocks = list(tqdm(scanner))
def tryget(obj, att):
if hasattr(obj, att):
return getattr(obj, att)
return None
df = pd.DataFrame([{'index': i, 'length': tryget(obj, 'packet_len')} for i, obj in enumerate(blocks)])
# get udp packets
data = [blocks[i] for i in df[df.length == 6972.0].index]
# remove udp header
raw = [d.packet_data[0x2a:] for d in data]
def parse(data):
hdr = 4 + 2 * 7
c1, c2, part, a, ffaa, b, c, d = unpack('>Lhhhhhhh', data[:hdr])
ret = locals()
del ret['data']
del ret['hdr']
ret['data'] = data[hdr:]
return ret
df = pd.DataFrame([parse(d) for d in raw])
df2 = df[[c for c in df.columns if c != 'data']]
def getframes(df):
frames = []
current = []
for i, row in df.iterrows():
if row.part == 0:
# roll over
if len(current) > 0:
frames.append(current)
current = []
current.append(row['data'])
if len(current) > 0:
frames.append(current)
return [b''.join(parts) for parts in frames]
def image16(frame, width, height, pixelformat='>H'):
return [Image.fromarray(np.frombuffer(frame, dtype=pixelformat).reshape(width, height)) for frame in frames if len(frame) == 2 * width * height]
def showvideo(images):
# wip
videodims = images[0].size
fourcc = cv2.VideoWriter_fourcc(*'avc1')
video = cv2.VideoWriter("test.mp4",fourcc, 60,videodims)
img = Image.new('RGB', videodims, color = 'darkred')
#draw stuff that goes on every frame here
for i in range(0,60*60):
imtemp = img.copy()
# draw frame specific stuff here.
video.write(cv2.cvtColor(np.array(imtemp), cv2.COLOR_RGB2BGR))
video.release()
#def shape(frames, width, height, mode='RGB'):
# return [Image.frombytes('RGB', (width, height), frame) for frame in frames if len(frame) == width * height * 3]
frames = getframes(df)
images = image16(frames, 384, 288)
for i, img in enumerate(tqdm(images)):
img.save(f'{i:04}.png')
#frame = b''.join(df.iloc[:32]['data'])
#Image.frombytes('RGB', (288, 256), frame).show()
start_len = len(b'T=(-1.665884e-08)*X^4+(1.347094e-05)*X^3+(-4.396264e-03)*X^2+(9.506939e-01)*X+(-6.353247e+01)\r\n ')
# 250 bytes at start of frame
equations = {x[:250].decode().strip() for x in df[df.part == 0]['data']}
# seen only a single equation on all packets
assert len(equations) == 1
assert list(equations)[0] == 'T=(-1.665884e-08)*X^4+(1.347094e-05)*X^3+(-4.396264e-03)*X^2+(9.506939e-01)*X+(-6.353247e+01)'
# we have 6372 packets
assert df.shape[0] == 6372
# produce a video
system('ffmpeg -f image2 -framerate 25 -i %04d.png -s 288x384 thermal.avi')
print('to play: ffplay thermal.avi')