Project

General

Profile

New Model #9237 » gm30ctl.py

Alexandru Barbur, 05/29/2022 02:00 AM

 
#!/usr/bin/env python

# This script requires Python 3.
# $ pip install pyserial
# $ python gm30ctl.py

import io
import struct
import argparse
import typing as t
from pathlib import Path

import serial


def send_ack(port):
port.write(bytes([0x06]))
port.flush()


def receive_ack(port):
response = port.read(1)
assert response == bytes([0x06])


def read_memory(port, address: int, size: int) -> bytes:
# Request: 0x52 ADDRx2 0x00 SIZEx1
request = bytearray([0x52, 0x00, 0x00, 0x00, 0x00])
struct.pack_into('<HxB', request, 1, address, size)

port.write(request)
port.flush()

# Response: 0x57 ADDRx2 0x00 SIZEx1 [DATAx1 .. DATAx1]
expected_response_size = 5 + size
response = port.read(expected_response_size)
assert len(response) == expected_response_size
assert response[0] == 0x57
assert response[1:5] == request[1:5]

send_ack(port)
receive_ack(port)

return response[5:]


def write_memory(port, data: bytes, address: int):
# Request: 0x57 ADDRx2 0x00 SIZEx1 [DATAx1 .. DATAx2]
request = bytearray([0x57, 0x00, 0x00, 0x00, 0x00])
struct.pack_into('<HxB', request, 1, address, len(bytes))

port.write(request)
port.write(data)
port.flush()

# Response
receive_ack(port)


def read_memory_bytes(port, addresses: t.List[int]) -> bytes:
buffer = io.BytesIO()
for address in addresses:
buffer.write(read_memory(port, address, 0x01))

buffer.seek(0)
return buffer.read()


def read_memory_range(port, address: int, size: int, chunk_size: int = 0x40) -> bytes:
buffer = io.BytesIO()

read_address = address
read_bytes_remaining = size
while read_bytes_remaining > 0:
read_size = min(chunk_size, read_bytes_remaining)
buffer.write(read_memory(port, read_address, read_size))

read_bytes_remaining -= read_size
read_address += read_size

buffer.seek(0)
return buffer.read()


def write_memory_range(port, data: bytes, address: int, chunk_size: int = 0x40):
write_counter = 0
while write_counter < len(bytes):
write_size = min(chunk_size, len(bytes) - write_counter)
write_data = data[write_counter:write_counter + write_size]
write_memory(port, address + write_counter, write_data)

write_counter += write_size


def common_init(port):
# PSEARCH
port.write(b'PSEARCH')
port.flush()

receive_ack(port)

response = port.read(7)
assert response.decode() == 'P13GMRS'

# XXX
port.write(b'PASSSTA')
port.flush()

response = port.read(3)
assert response[:1].decode() == 'P'
assert response[1] == 0x00
assert response[2] == 0x00

# XXX
port.write(b'SYSINFO')
port.flush()

receive_ack(port)

# XXX
port.write(bytes([0x56, 0x00, 0x00, 0x0A, 0x0D]))
port.flush()

response = port.read(3)
assert response == bytes([0x56, 0x0D, 0x0A])

response = port.read(10)
print(' '.join([f"0x{b:x}" for b in response]))
# assert response == bytes([
# 0x0A,
# 0x0D,
# 0x00,
# 0x00,
# 0x00,
# 0x11,
# 0xCA,
# 0x00,
# 0x08,
# 0x01])

send_ack(port)
receive_ack(port)

# XXX
port.write(bytes([0x56, 0x00, 0x10, 0x0A, 0x0D]))
port.flush()

response = port.read(3)
assert response == bytes([0x56, 0x0D, 0x0A])

response = port.read(10)
print(' '.join([f"0x{b:x}" for b in response]))
# assert response == bytes([
# 0x0A,
# 0x0D,
# 0x00,
# 0x00,
# 0x00,
# 0x11,
# 0xCA,
# 0x00,
# 0x08,
# 0x01])

send_ack(port)
receive_ack(port)

# XXX
port.write(bytes([0x56, 0x00, 0x20, 0x0A, 0x0D]))
port.flush()

response = port.read(3)
assert response == bytes([0x56, 0x0D, 0x0A])

response = port.read(10)
print(' '.join([f"0x{b:x}" for b in response]))
# assert response == bytes([
# 0x0A,
# 0x0D,
# 0x00,
# 0x00,
# 0x00,
# 0x11,
# 0xCA,
# 0x00,
# 0x08,
# 0x01])

send_ack(port)
receive_ack(port)

# XXX
port.write(bytes([0x56, 0x00, 0x00, 0x00, 0x0A]))
port.flush()

response = port.read(3)
print(' '.join([f"0x{b:x}" for b in response]))
# assert response == bytes([0x56, 0x0D, 0x0A])

response = port.read(3)
print(' '.join([f"0x{b:x}" for b in response]))
# assert response == bytes([0x56, 0x0A, 0x08])

response = port.read(5)
print(' '.join([f"0x{b:x}" for b in response]))
# assert response == bytes([0x00, 0x10, 0x00, 0x00, 0xFF])

send_ack(port)
receive_ack(port)

# XXX
port.write(bytes([0xFF, 0xFF, 0xFF, 0xFF, 0x0C]))
port.flush()

port.write(b'P13GMRS')
port.flush()

receive_ack(port)

# XXX
port.write(bytes([0x02]))
port.flush()

response = port.read(8)
assert response == bytes([0xFF] * 8)

send_ack(port)
receive_ack(port)

# XXX
data = read_memory_bytes(port, [
0x1FFF,
0x2FFF,
0x3FFF,
0x4FFF,
0x5FFF,
0x6FFF,
0x7FFF,
0x8FFF,
0x9FFF,
0xAFFF,
0xBFFF,
0xCFFF,
0xDFFF,
0xEFFF,
0xFFFF])

with open('region_1FFF.bin', 'wb') as output_file:
output_file.write(data)


def read(device_path: Path):
with serial.Serial(
port=str(device_path),
baudrate=57600,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
xonxoff=False,
rtscts=True,
dsrdtr=True
) as port:
common_init(port)

# XXX data file 0x2000, ???
data = read_memory_range(port, 0x1000, 0xFC0, 0x40)
with open('region_1000.bin', 'wb') as output_file:
output_file.write(data)

# XXX data file 0x3000 with some differences, ???
data = read_memory_range(port, 0xF000, 0xFC0, 0x40)
with open('region_F000.bin', 'wb') as output_file:
output_file.write(data)

# XXX data file 0x4000, channel names?
data = read_memory_range(port, 0x3000, 0xFC0, 0x40)
with open('region_3000.bin', 'wb') as output_file:
output_file.write(data)

# XXX data file 0x5000, general settings?
data = read_memory_range(port, 0xB000, 0xFC0, 0x40)
with open('region_B000.bin', 'wb') as output_file:
output_file.write(data)

# XXX data file 0x6000, ???
data = read_memory_range(port, 0xD000, 0xFC0, 0x40)
with open('region_D000.bin', 'wb') as output_file:
output_file.write(data)


def write(device_path: Path):
with serial.Serial(
port=str(device_path),
baudrate=57600,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
xonxoff=False,
rtscts=True,
dsrdtr=True
) as port:
common_init(port)

# XXX data file 0x3000, ???
with open('region_7000.bin', 'rb') as input_file:
data = input_file.read()

assert len(data) == 0xFC0
write_memory_range(port, data, 0x7000, 0x40)

# XXX data file 0x4000, channel names?
with open('region_3000.bin', 'rb') as input_file:
data = input_file.read()

assert len(data) == 0xFC0
write_memory_range(port, data, 0x3000, 0x40)

# XXX data file offset 0x5000, general settings?
with open('region_4000.bin', 'rb') as input_file:
data = input_file.read()

assert len(data) == 0xFC0
write_memory_range(port, data, 0x4000, 0x40)

# XXX data file offset 0x6000, ???
with open('region_D000.bin', 'rb') as input_file:
data = input_file.read()

assert len(data) == 0xFC0
write_memory_range(port, data, 0xD000, 0x40)


def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'-d', '--device',
type=Path,
default='/dev/ttyUSB0')

subparsers = parser.add_subparsers()

parser_read = subparsers.add_parser('read', help="read radio memory")
parser_read.set_defaults(run=read)

parser_write = subparsers.add_parser('write', help="write radio memory")
parser_write.set_defaults(run=write)

args = parser.parse_args()
args.run(device_path=args.device)


if __name__ == '__main__':
main()
(6-6/11)