HedgehogTrack/recorder.py
2025-06-17 22:04:02 +02:00

93 lines
3.2 KiB
Python

import serial
import time
import pickle
from datetime import datetime
from pathlib import Path
from parser_mmw_demo import parser_one_mmw_demo_output_packet
def configure_sensor(config_file: Path, config_device: serial.Serial):
with config_file.open(mode="rt", encoding="utf-8") as config_fp:
for line in config_fp:
line = line.rstrip("\r\n")
print(f"[CONFIG] writing: '{line}'")
config_device.write((line + "\n").encode())
time.sleep(0.01)
# parser_one_mmw_demo_output_packet extracts only one complete frame at a time
# so call this in a loop till end of file
def parse_frames(data_device: serial.Serial, record_file: Path = Path("sensor_recording.bin")):
sensor_data = bytearray()
bytes_parsed = 0
with record_file.open(mode="wb") as fp:
while True:
read_data = data_device.read(256)
sensor_data += bytearray(read_data)
# parser_one_mmw_demo_output_packet function already prints the
# parsed data to stdio. So showcasing only saving the data to arrays
# here for further custom processing
parser_result, \
headerStartIndex, \
totalPacketNumBytes, \
numDetObj, \
numTlv, \
subFrameNumber, \
detectedX_array, \
detectedY_array, \
detectedZ_array, \
detectedV_array, \
detectedRange_array, \
detectedAzimuth_array, \
detectedElevation_array, \
detectedSNR_array, \
detectedNoise_array = parser_one_mmw_demo_output_packet(sensor_data[bytes_parsed::1], len(sensor_data) - bytes_parsed)
pickle.dump({
"timestamp": datetime.now(),
"parser_result": parser_result,
"header_start_index": headerStartIndex,
"total_packet_size": totalPacketNumBytes,
"detected_objects": numDetObj,
"tlv_count": numTlv,
"sub_frame_number": subFrameNumber,
"detected_points": (detectedX_array,
detectedY_array,
detectedZ_array,
detectedV_array,
detectedRange_array,
detectedAzimuth_array,
detectedElevation_array,
detectedSNR_array,
detectedNoise_array),
}, fp)
# Check the parser result
print ("Parser result: ", parser_result)
if (parser_result == 0):
bytes_parsed += (headerStartIndex+totalPacketNumBytes)
print("totalBytesParsed: ", bytes_parsed)
def main():
config_device = serial.Serial('/dev/ttyACM0', 115200)
data_device = serial.Serial('/dev/ttyACM1', 921600)
configure_sensor(Path("hedgehog.cfg"), config_device)
try:
parse_frames(data_device)
except KeyboardInterrupt:
pass
finally:
config_device.write("sensorStop\n".encode())
if __name__ == "__main__":
main()