The code:
#!/usr/bin/env python3
import smbus2
import RPi.GPIO as GPIO
import time
usleep = lambda x: time.sleep(x/1000000.0)
address = 0x42
bus = smbus2.SMBus(1)
GPIO.setmode(GPIO.BOARD)
TRANS_PIN = 7
RESET_PIN = 11
# http://ww1.microchip.com/downloads/en/DeviceDoc/40001718E.pdf
GPIO.setup(TRANS_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(RESET_PIN, GPIO.OUT, initial=GPIO.LOW)
GPIO.output(RESET_PIN, GPIO.LOW)
usleep(10000)
GPIO.output(RESET_PIN, GPIO.HIGH)
usleep(50000)
def mg3030_read_data(size=140):
ret = -1
time_out = 0
header = None
payload = None
while GPIO.input(TRANS_PIN):
usleep(200)
time_out = time_out + 1
if time_out > 100:
print("wait for sensor data time out!")
return header, payload
if not GPIO.input(TRANS_PIN):
GPIO.setup(TRANS_PIN, GPIO.OUT, initial=GPIO.LOW)
usleep(10000)
data = smbus2.i2c_msg.read(address, size)
bus.i2c_rdwr(data)
data = bytes(data)
length = data[0]
if length > 4:
header = data[0:4]
payload = data[4:length]
usleep(10000);
GPIO.output(TRANS_PIN, GPIO.HIGH)
GPIO.setup(TRANS_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
return header, payload
SET_RUNTIME_PARAMETER = 0xa2
FW_VERSION_INFO = 0x83
REQUEST_MESSAGE = 0x06
def mg3030_request_message(message_id=SET_RUNTIME_PARAMETER, param=0):
buf = []
# msg size
buf.append(0x0c)
# flag
buf.append(0)
# sequence
buf.append(0)
# set runtime param cmd id.
buf.append(REQUEST_MESSAGE)
buf.append(message_id)
buf.append(0)
buf.append(0)
buf.append(0)
buf.append(param & 0xFF)
param >>= 8
buf.append(param & 0xFF)
param >>= 8
buf.append(param & 0xFF)
param >>= 8
buf.append(param & 0xFF)
buf[0] = len(buf)
print("out >>>>>", (bytes(buf[0:4]), bytes(buf[4:])))
bus.i2c_rdwr(smbus2.i2c_msg.write(address, buf))
return mg3030_read_data(16)
def mg3030_set_runtime_param(run_time_id, arg0=0, arg1=0):
buf = [];
# msg size
buf.append(0x10)
# flag
buf.append(0)
# sequence
buf.append(0)
# set runtime param cmd id.
buf.append(SET_RUNTIME_PARAMETER)
# set runtime param internal id.
buf.append(run_time_id & 0xFF)
buf.append(run_time_id >> 8)
# Two reserve bit.
buf.append(0)
buf.append(0)
# arg0 , 4 byte
buf.append(arg0 & 0xFF)
arg0 >>= 8
buf.append(arg0 & 0xFF)
arg0 >>= 8
buf.append(arg0 & 0xFF)
arg0 >>= 8
buf.append(arg0 & 0xFF)
# arg1 4 byte
buf.append(arg1 & 0xFF)
arg1 >>= 8
buf.append(arg1 & 0xFF)
arg1 >>= 8
buf.append(arg1 & 0xFF)
arg1 >>= 8
buf.append(arg1 & 0xFF)
buf[0] = len(buf)
print("out >>>>>", (bytes(buf[0:4]), bytes(buf[4:])))
bus.i2c_rdwr(smbus2.i2c_msg.write(address, buf))
return mg3030_read_data(16)
def echo_test():
buf = [];
buf.append(4)
buf.append(0)
buf.append(1)
buf.append(0x40)
buf[0] = len(buf)
print("out >>>>>", (bytes(buf[0:4]), bytes(buf[4:])))
bus.i2c_rdwr(smbus2.i2c_msg.write(address, buf))
return mg3030_read_data(16)
print("in <<<<<<", mg3030_read_data(132))
print("in <<<<<<", mg3030_request_message(FW_VERSION_INFO))
print("in <<<<<<", mg3030_set_runtime_param(0x80, 0x1b, 0x1f))
print("in <<<<<<", mg3030_set_runtime_param(0xa1, 0, 0xffffffff))
#print("in <<<<<<", echo_test())
print("in <<<<<<", mg3030_read_data())
print("in <<<<<<", mg3030_read_data())
print("in <<<<<<", mg3030_read_data())
print("in <<<<<<", mg3030_read_data())
print("in <<<<<<", mg3030_read_data())
GPIO.cleanup()
bus.close()