I’m doing an experiment, creating a class to communicate between a Xiao board with Circuitpython and GrooveVision AI V2, but I haven’t been able to communicate successfully yet.
I initially tried the UART protocol, because the list of commands available at this link SSCMA-Micro/docs/protocol/at_protocol.md at dev · Seeed-Studio/SSCMA-Micro · GitHub gave me to understand that the communication was done this way.
Using UART I get the message that TX is in use.
Using I2C I get null message.
Using SPI, I get the message that CS is in use.
'''
Circuitpython SSCMA Test Library
Implement this command list:
AT+MQTTPUBSUB? Get current MQTT publish and subscribe topic
AT+MQTTSERVER? Get current MQTT server status and config
AT+MQTTSERVERSTA=<STA> Set MQTT server status
AT+MQTTSERVER=<"CLIENT_ID","ADDRESS",PORT,"USERNAME","PASSWORD",USE_SSL> Set and connect to a MQTT server
AT+WIFI? Get current Wi-Fi status and config
AT+WIFIVER? Get Wi-Fi version
AT+WIFIVER=<VER> Set Wi-Fi version
AT+WIFIIN6=<"IP"> Set IPv6 info
AT+WIFIIN4=<"IP","NETMASK","GATEWAY"> Set IPv4 info
AT+WIFISTA=<STA> Set Wi-Fi status
AT+WIFI=<"NAME",SECURITY,"PASSWORD"> Set and connect to a Wi-Fi
AT+INFO? Get info string from device flash
AT+INFO=<"INFO_STRING"> Store info string to device flash
AT+ACTION? Get action trigger
AT+ACTION=<"EXPRESSION"> Set action trigger
AT+INVOKE? Get invoke task status
AT+INVOKE=<N_TIMES,DIFFERED,RESULT_ONLY> Invoke for N times (-1 for infinity loop)
AT+SAMPLE? Get sample task status
AT+SAMPLE=<N_TIMES> Sample data from current sensor
AT+SENSOR? Get current sensor info
AT+SENSOR=<SENSOR_ID,ENABLE/DISABLE,OPT_ID> Set a default sensor by sensor ID
AT+SENSORS? Get available sensors
AT+ALGO?Get current algorithm info
AT+ALGO=<ALGORITHM_ID> Set a algorithm by algorithm type ID
AT+ALGOS? Get available algorithms
AT+MODEL? Get current model info
AT+MODEL=<MODEL_ID> Load a model by model ID
AT+MODELS? Get available models
AT+LED=<ENABLE/DISABLE> Set LED status
AT+YIELD=<TIME_S>
'''
import time, board, busio, json
# This Library still is not functional. Get TX in use, and can't create UART
class ATDevice:
def __init__(self, uart):
self.uart = uart
self._msg_buffer = b''
self.listeners = []
def send_command(self, command, tag=None, timeout=1):
if tag:
full_command = f'AT+{tag}@{command}\r'
else:
full_command = f'AT+{command}\r'
self.uart.write(full_command.encode('utf-8'))
time.sleep(timeout)
return self._read_response()
def _read_response(self):
response = []
while self.uart.in_waiting > 0:
response.append(self.uart.readline().decode('utf-8').strip())
response = ''.join(response)
return self._parse_json(response)
def _parse_json(self, response):
try:
return json.loads(response)
except json.JSONDecodeError:
print("Failed to decode JSON response.")
return None
def _handle_response(self, response):
if response is None:
return "Invalid JSON response"
response_type = response.get("type", None)
command_name = response.get("name", None)
response_code = response.get("code", None)
data = response.get("data", None)
if response_type is None or command_name is None or response_code is None:
return "Malformed response"
if response_code == 0:
return data
else:
return f"Error {response_code}: {self._get_error_message(response_code)}"
def _get_error_message(self, code):
error_messages = {
0: "Success",
1: "Try again",
2: "Logic error",
3: "Timeout",
4: "IO error",
5: "Invalid argument",
6: "Out of memory",
7: "Busy",
8: "Not supported",
9: "Operation not permitted"
}
return error_messages.get(code, "Unknown error")
# Command functions
def get_mqtt_pubsub(self):
response = self.send_command('MQTTPUBSUB?')
return self._handle_response(response)
def get_mqtt_server(self):
response = self.send_command('MQTTSERVER?')
return self._handle_response(response)
def set_mqtt_server_status(self, status):
response = self.send_command(f'MQTTSERVERSTA={status}')
return self._handle_response(response)
def set_mqtt_server(self, client_id, address, port, username, password, use_ssl):
response = self.send_command(f'MQTTSERVER="{client_id}","{address}",{port},"{username}","{password}",{use_ssl}')
return self._handle_response(response)
def get_wifi_status(self):
response = self.send_command('WIFI?')
return self._handle_response(response)
def get_wifi_version(self):
response = self.send_command('WIFIVER?')
return self._handle_response(response)
def set_wifi_version(self, version):
response = self.send_command(f'WIFIVER={version}')
return self._handle_response(response)
def set_wifi_ipv6(self, ip):
response = self.send_command(f'WIFIIN6="{ip}"')
return self._handle_response(response)
def set_wifi_ipv4(self, ip, netmask, gateway):
response = self.send_command(f'WIFIIN4="{ip}","{netmask}","{gateway}"')
return self._handle_response(response)
def set_wifi_status(self, status):
response = self.send_command(f'WIFISTA={status}')
return self._handle_response(response)
def connect_wifi(self, name, security, password):
response = self.send_command(f'WIFI="{name}",{security},"{password}"')
return self._handle_response(response)
def get_device_info(self):
response = self.send_command('INFO?')
return self._handle_response(response)
def set_device_info(self, info_string):
response = self.send_command(f'INFO="{info_string}"')
return self._handle_response(response)
def get_action_trigger(self):
response = self.send_command('ACTION?')
return self._handle_response(response)
def set_action_trigger(self, expression):
response = self.send_command(f'ACTION="{expression}"')
return self._handle_response(response)
def get_invoke_status(self):
response = self.send_command('INVOKE?')
return self._handle_response(response)
def invoke_task(self, n_times, differed, result_only):
response = self.send_command(f'INVOKE={n_times},{differed},{result_only}')
return self._handle_response(response)
def get_sample_status(self):
response = self.send_command('SAMPLE?')
return self._handle_response(response)
def sample_data(self, n_times):
response = self.send_command(f'SAMPLE={n_times}')
return self._handle_response(response)
def get_sensor_info(self):
response = self.send_command('SENSOR?')
return self._handle_response(response)
def set_sensor(self, sensor_id, enable, opt_id):
response = self.send_command(f'SENSOR={sensor_id},{enable},{opt_id}')
return self._handle_response(response)
def get_available_sensors(self):
response = self.send_command('SENSORS?')
return self._handle_response(response)
def get_algorithm_info(self):
response = self.send_command('ALGO?')
return self._handle_response(response)
def set_algorithm(self, algorithm_id):
response = self.send_command(f'ALGO={algorithm_id}')
return self._handle_response(response)
def get_available_algorithms(self):
response = self.send_command('ALGOS?')
return self._handle_response(response)
def get_model_info(self):
response = self.send_command('MODEL?')
return self._handle_response(response)
def set_model(self, model_id):
response = self.send_command(f'MODEL={model_id}')
return self._handle_response(response)
def get_available_models(self):
response = self.send_command('MODELS?')
return self._handle_response(response)
def set_led_status(self, status):
response = self.send_command(f'LED={status}')
return self._handle_response(response)
def yield_time(self, time_s):
response = self.send_command(f'YIELD={time_s}')
return self._handle_response(response)
def _handle_incoming_data(self, data):
self._msg_buffer += data
matches = re.findall(b'\r{.*}\n', self._msg_buffer)
for match in matches:
try:
payload = json.loads(match.decode('utf-8'))
if "type" in payload:
for listener in self.listeners:
if listener["name"] == payload["name"]:
listener["response"] = payload
listener["event"].set()
except Exception as ex:
print(f"Failed to handle incoming data: {ex}")
finally:
self._msg_buffer = self._msg_buffer[self._msg_buffer.find(b'\n')+1:]
# Exemplo de uso:
uart = busio.UART(board.TX, board.RX, baudrate=921600)
device = ATDevice(uart)
# Obter status atual do Wi-Fi
print(device.get_wifi_status())