Possible implementation of Circuitpython Library to use with Groove Vision AI V2

I’m doing an experiment, creating a class to communicate between a Xiao board with Circuitpython and GrooveVision AI V2, but I haven’t been able to communicate successfully yet.

I initially tried the UART protocol, because the list of commands available at this link SSCMA-Micro/docs/protocol/at_protocol.md at dev · Seeed-Studio/SSCMA-Micro · GitHub gave me to understand that the communication was done this way.

Using UART I get the message that TX is in use.
Using I2C I get null message.
Using SPI, I get the message that CS is in use.

'''
    Circuitpython SSCMA Test Library
    
    Implement this command list:
    
    AT+MQTTPUBSUB? 	Get current MQTT publish and subscribe topic
    AT+MQTTSERVER? 	Get current MQTT server status and config
    AT+MQTTSERVERSTA=<STA> Set MQTT server status
    AT+MQTTSERVER=<"CLIENT_ID","ADDRESS",PORT,"USERNAME","PASSWORD",USE_SSL> Set and connect to a MQTT server
    AT+WIFI? Get current Wi-Fi status and config
    AT+WIFIVER? Get Wi-Fi version
    AT+WIFIVER=<VER> Set Wi-Fi version
    AT+WIFIIN6=<"IP"> Set IPv6 info     
    AT+WIFIIN4=<"IP","NETMASK","GATEWAY"> Set IPv4 info
    AT+WIFISTA=<STA> Set Wi-Fi status    
    AT+WIFI=<"NAME",SECURITY,"PASSWORD"> Set and connect to a Wi-Fi      
    AT+INFO? Get info string from device flash         
    AT+INFO=<"INFO_STRING">  Store info string to device flash     
    AT+ACTION? Get action trigger         
    AT+ACTION=<"EXPRESSION"> Set action trigger
    AT+INVOKE? Get invoke task status
    AT+INVOKE=<N_TIMES,DIFFERED,RESULT_ONLY> Invoke for N times (-1 for infinity loop)
    AT+SAMPLE? Get sample task status
    AT+SAMPLE=<N_TIMES> Sample data from current sensor
    AT+SENSOR? Get current sensor info
    AT+SENSOR=<SENSOR_ID,ENABLE/DISABLE,OPT_ID>  Set a default sensor by sensor ID
    AT+SENSORS? Get available sensors 
    AT+ALGO?Get current algorithm info     
    AT+ALGO=<ALGORITHM_ID> Set a algorithm by algorithm type ID
    AT+ALGOS? Get available algorithms    
    AT+MODEL? Get current model info     
    AT+MODEL=<MODEL_ID> Load a model by model ID          
    AT+MODELS? Get available models 
    AT+LED=<ENABLE/DISABLE> Set LED status        
    AT+YIELD=<TIME_S>
'''

import time, board, busio, json

# This Library still is not functional. Get TX in use, and can't create UART

class ATDevice:
    def __init__(self, uart):
        self.uart = uart
        self._msg_buffer = b''
        self.listeners = []

    def send_command(self, command, tag=None, timeout=1):
        if tag:
            full_command = f'AT+{tag}@{command}\r'
        else:
            full_command = f'AT+{command}\r'
        self.uart.write(full_command.encode('utf-8'))
        time.sleep(timeout)
        return self._read_response()

    def _read_response(self):
        response = []
        while self.uart.in_waiting > 0:
            response.append(self.uart.readline().decode('utf-8').strip())
        response = ''.join(response)
        return self._parse_json(response)

    def _parse_json(self, response):
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            print("Failed to decode JSON response.")
            return None

    def _handle_response(self, response):
        if response is None:
            return "Invalid JSON response"
        
        response_type = response.get("type", None)
        command_name = response.get("name", None)
        response_code = response.get("code", None)
        data = response.get("data", None)
        
        if response_type is None or command_name is None or response_code is None:
            return "Malformed response"
        
        if response_code == 0:
            return data
        else:
            return f"Error {response_code}: {self._get_error_message(response_code)}"
    
    def _get_error_message(self, code):
        error_messages = {
            0: "Success",
            1: "Try again",
            2: "Logic error",
            3: "Timeout",
            4: "IO error",
            5: "Invalid argument",
            6: "Out of memory",
            7: "Busy",
            8: "Not supported",
            9: "Operation not permitted"
        }
        return error_messages.get(code, "Unknown error")

    # Command functions
    def get_mqtt_pubsub(self):
        response = self.send_command('MQTTPUBSUB?')
        return self._handle_response(response)

    def get_mqtt_server(self):
        response = self.send_command('MQTTSERVER?')
        return self._handle_response(response)

    def set_mqtt_server_status(self, status):
        response = self.send_command(f'MQTTSERVERSTA={status}')
        return self._handle_response(response)

    def set_mqtt_server(self, client_id, address, port, username, password, use_ssl):
        response = self.send_command(f'MQTTSERVER="{client_id}","{address}",{port},"{username}","{password}",{use_ssl}')
        return self._handle_response(response)

    def get_wifi_status(self):
        response = self.send_command('WIFI?')
        return self._handle_response(response)

    def get_wifi_version(self):
        response = self.send_command('WIFIVER?')
        return self._handle_response(response)

    def set_wifi_version(self, version):
        response = self.send_command(f'WIFIVER={version}')
        return self._handle_response(response)

    def set_wifi_ipv6(self, ip):
        response = self.send_command(f'WIFIIN6="{ip}"')
        return self._handle_response(response)

    def set_wifi_ipv4(self, ip, netmask, gateway):
        response = self.send_command(f'WIFIIN4="{ip}","{netmask}","{gateway}"')
        return self._handle_response(response)

    def set_wifi_status(self, status):
        response = self.send_command(f'WIFISTA={status}')
        return self._handle_response(response)

    def connect_wifi(self, name, security, password):
        response = self.send_command(f'WIFI="{name}",{security},"{password}"')
        return self._handle_response(response)

    def get_device_info(self):
        response = self.send_command('INFO?')
        return self._handle_response(response)

    def set_device_info(self, info_string):
        response = self.send_command(f'INFO="{info_string}"')
        return self._handle_response(response)

    def get_action_trigger(self):
        response = self.send_command('ACTION?')
        return self._handle_response(response)

    def set_action_trigger(self, expression):
        response = self.send_command(f'ACTION="{expression}"')
        return self._handle_response(response)

    def get_invoke_status(self):
        response = self.send_command('INVOKE?')
        return self._handle_response(response)

    def invoke_task(self, n_times, differed, result_only):
        response = self.send_command(f'INVOKE={n_times},{differed},{result_only}')
        return self._handle_response(response)

    def get_sample_status(self):
        response = self.send_command('SAMPLE?')
        return self._handle_response(response)

    def sample_data(self, n_times):
        response = self.send_command(f'SAMPLE={n_times}')
        return self._handle_response(response)

    def get_sensor_info(self):
        response = self.send_command('SENSOR?')
        return self._handle_response(response)

    def set_sensor(self, sensor_id, enable, opt_id):
        response = self.send_command(f'SENSOR={sensor_id},{enable},{opt_id}')
        return self._handle_response(response)

    def get_available_sensors(self):
        response = self.send_command('SENSORS?')
        return self._handle_response(response)

    def get_algorithm_info(self):
        response = self.send_command('ALGO?')
        return self._handle_response(response)

    def set_algorithm(self, algorithm_id):
        response = self.send_command(f'ALGO={algorithm_id}')
        return self._handle_response(response)

    def get_available_algorithms(self):
        response = self.send_command('ALGOS?')
        return self._handle_response(response)

    def get_model_info(self):
        response = self.send_command('MODEL?')
        return self._handle_response(response)

    def set_model(self, model_id):
        response = self.send_command(f'MODEL={model_id}')
        return self._handle_response(response)

    def get_available_models(self):
        response = self.send_command('MODELS?')
        return self._handle_response(response)

    def set_led_status(self, status):
        response = self.send_command(f'LED={status}')
        return self._handle_response(response)

    def yield_time(self, time_s):
        response = self.send_command(f'YIELD={time_s}')
        return self._handle_response(response)

    def _handle_incoming_data(self, data):
        self._msg_buffer += data
        matches = re.findall(b'\r{.*}\n', self._msg_buffer)
        for match in matches:
            try:
                payload = json.loads(match.decode('utf-8'))
                if "type" in payload:
                    for listener in self.listeners:
                        if listener["name"] == payload["name"]:
                            listener["response"] = payload
                            listener["event"].set()
            except Exception as ex:
                print(f"Failed to handle incoming data: {ex}")
            finally:
                self._msg_buffer = self._msg_buffer[self._msg_buffer.find(b'\n')+1:]

# Exemplo de uso:
uart = busio.UART(board.TX, board.RX, baudrate=921600)
device = ATDevice(uart)

# Obter status atual do Wi-Fi
print(device.get_wifi_status())

I also confirm that the above commands work via serial terminal, with USB connection, using Minicom, Putty or Tera Term.

The command list gives us the impression that it would be possible to switch between models installed on the board, but this does not appear to have been implemented via Sensecraft AI.