import serial import time import datetime import pickle from pathlib import Path from parser_mmw_demo import parser_one_mmw_demo_output_packet def configure_sensor(config_file: Path, config_device: serial.Serial): with config_file.open(mode="rt", encoding="utf-8") as config_fp: for line in config_fp: line = line.rstrip("\r\n") print(f"[CONFIG] writing: '{line}'") config_device.write((line + "\n").encode()) time.sleep(0.01) # parser_one_mmw_demo_output_packet extracts only one complete frame at a time # so call this in a loop till end of file def parse_frames(data_device: serial.Serial, record_file: Path = Path("sensor_recording.bin")): sensor_data = bytearray() bytes_parsed = 0 with record_file.open(mode="wb") as fp: while True: read_data = data_device.read(256) sensor_data += bytearray(read_data) # parser_one_mmw_demo_output_packet function already prints the # parsed data to stdio. So showcasing only saving the data to arrays # here for further custom processing parser_result, \ headerStartIndex, \ totalPacketNumBytes, \ numDetObj, \ numTlv, \ subFrameNumber, \ detectedX_array, \ detectedY_array, \ detectedZ_array, \ detectedV_array, \ detectedRange_array, \ detectedAzimuth_array, \ detectedElevation_array, \ detectedSNR_array, \ detectedNoise_array = parser_one_mmw_demo_output_packet(sensor_data[bytes_parsed::1], len(sensor_data) - bytes_parsed) pickle.dump({ "timestamp": datetime.now(), "parser_result": parser_result, "header_start_index": headerStartIndex, "total_packet_size": totalPacketNumBytes, "detected_objects": numDetObj, "tlv_count": numTlv, "sub_frame_number": subFrameNumber, "detected_points": (detectedX_array, detectedY_array, detectedZ_array, detectedV_array, detectedRange_array, detectedAzimuth_array, detectedElevation_array, detectedSNR_array, detectedNoise_array), }, fp) # Check the parser result print ("Parser result: ", parser_result) if (parser_result == 0): bytes_parsed += (headerStartIndex+totalPacketNumBytes) print("totalBytesParsed: ", bytes_parsed) def main(): config_device = serial.Serial('/dev/ttyACM0', 115200) data_device = serial.Serial('/dev/ttyACM1', 921600) configure_sensor(Path("hedgehog.cfg"), config_device) try: parse_frames(data_device) except: config_device.write("sensorStop\n".encode()) if __name__ == "__main__": try: main() except KeyboardInterrupt: exit(0)