|
#!/usr/bin/env python
|
|
|
|
# This script requires Python 3.
|
|
# $ pip install pyserial
|
|
# $ python gm30ctl.py
|
|
|
|
import io
|
|
import enum
|
|
import struct
|
|
import argparse
|
|
import typing as t
|
|
from pathlib import Path
|
|
from datetime import timedelta
|
|
from dataclasses import dataclass
|
|
|
|
import serial
|
|
|
|
|
|
##
|
|
# High level radio interface.
|
|
##
|
|
|
|
class RadioBootscreenMode(enum.Enum):
|
|
LOGO = enum.auto()
|
|
MESSAGE = enum.auto()
|
|
VOLTAGE = enum.auto()
|
|
|
|
|
|
@dataclass
|
|
class Radio:
|
|
bootscreen_mode: RadioBootscreenMode
|
|
bootscreen_line1: str
|
|
bootscreen_line2: str
|
|
|
|
|
|
class Protocol:
|
|
"""
|
|
Radioddity GM-30 serial control and programming protocol.
|
|
|
|
The protocol loosely follows a request/response flow with one-way or
|
|
bi-directional acknowledgements. It relies on hardware flow control.
|
|
|
|
Requests:
|
|
- Command Requests
|
|
- PSEARCH Request
|
|
- PASSSTA Request (?)
|
|
- SYSINFO Request (?)
|
|
|
|
PSEARCH Request:
|
|
- Send Bytes: 'PSEARCH' as ASCII
|
|
- Read Ack:
|
|
- 1x Byte: 0x06
|
|
- Read Variable Length Response: Firmware variant name as ASCII
|
|
- Known Variant: P13GMRS (US region GMRS firmware)
|
|
|
|
Command Requests:
|
|
- 1x Bytes: Request Type
|
|
- 0x or 4x Bytes: Parameters
|
|
|
|
Command Ack Request:
|
|
- Type: 0x06
|
|
- Parameters: N/A
|
|
- Response: 0x06
|
|
|
|
Command Read Request:
|
|
- Radio must be in programming mode first.
|
|
- Type: 0x52
|
|
- Parameters:
|
|
- 2x Bytes: Little-Endian Address
|
|
- 1x Byte: 0x00 (?)
|
|
- 1x Byte: Read Size
|
|
- Response:
|
|
- 1x Byte: 0x57
|
|
- 2x Bytes: Little-Endian Address
|
|
- 1x Byte: 0x00 (?)
|
|
- 1x Byte: Read Size (Not Including 5x Byte Header)
|
|
- Read Size x Bytes: Data
|
|
- Requires Ack Request After
|
|
|
|
Command Write Request:
|
|
- Radio must be in programming mode first.
|
|
- Type: 0x57
|
|
- Parameters:
|
|
- 2x Bytes: Little-Endian Address
|
|
- 1x Byte: 0x00 (?)
|
|
- 1x Byte: Write Size (Not Including 4x Byte Header)
|
|
- Write Size x Bytes: Data
|
|
- Response:
|
|
- 1x Byte: 0x06
|
|
"""
|
|
|
|
@staticmethod
|
|
def open_port(device_path: Path) -> serial.Serial:
|
|
return serial.Serial(
|
|
port=str(device_path),
|
|
baudrate=57600,
|
|
bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
xonxoff=False,
|
|
rtscts=True,
|
|
dsrdtr=True,
|
|
timeout=None)
|
|
|
|
def __init__(self, port: serial.Serial, timeout = timedelta(seconds=1)):
|
|
self.port = port
|
|
self.timeout = timeout
|
|
|
|
def _reset(self):
|
|
# XXX: log warning if buffers are not empty
|
|
# XXX: not sure if this is actually necessary or useful
|
|
self.port.reset_input_buffer()
|
|
self.port.reset_output_buffer()
|
|
|
|
def _fixed_write(self, data: bytes) -> int:
|
|
self.port.write(data)
|
|
self.port.flush()
|
|
|
|
def _variable_read(self, max_count: int) -> bytes:
|
|
self.port.timeout = self.timeout.total_seconds()
|
|
return self.port.read(max_count)
|
|
|
|
def _fixed_read(self, expected_count: int) -> bytes:
|
|
response = self._variable_read(expected_count)
|
|
if not response:
|
|
self._reset()
|
|
raise RuntimeError("No response received")
|
|
|
|
if len(response) != expected_count:
|
|
self._reset()
|
|
raise RuntimeError("Unexpected read size")
|
|
|
|
return response
|
|
|
|
|
|
|
|
def send_ack(self):
|
|
self._fixed_write(bytes([0x06]))
|
|
|
|
def receive_ack(self):
|
|
response = self._fixed_read(1)
|
|
if response != bytes([0x06]):
|
|
raise RuntimeError("Failed to receive ACK")
|
|
|
|
def read_memory(self, address: int, size: int) -> bytes:
|
|
# Request: 0x52 ADDRx2 0x00 SIZEx1
|
|
request = bytearray([0x52, 0x00, 0x00, 0x00, 0x00])
|
|
struct.pack_into('<HxB', request, 1, address, size)
|
|
self._fixed_write(request)
|
|
|
|
# Response: 0x57 ADDRx2 0x00 SIZEx1 [DATAx1 .. DATAx1]
|
|
response = self._fixed_read(5 + size)
|
|
if response[0] != 0x57 or response[1:5] != request[1:5]:
|
|
raise RuntimeError("Read memory response invalid header")
|
|
|
|
# Sync
|
|
self.send_ack()
|
|
self.receive_ack()
|
|
|
|
return response[5:]
|
|
|
|
def write_memory(self, address: int, data: bytes):
|
|
# Request: 0x57 ADDRx2 0x00 SIZEx1 [DATAx1 .. DATAx2]
|
|
request = bytearray([0x57, 0x00, 0x00, 0x00, 0x00])
|
|
struct.pack_into('<HxB', request, 1, address, len(bytes))
|
|
self._fixed_write(request + data)
|
|
|
|
# Sync
|
|
self.receive_ack(port)
|
|
|
|
def read_memory_bytes(self, addresses: t.List[int]) -> bytes:
|
|
data = bytearray()
|
|
for address in addresses:
|
|
data += self.read_memory(address, 0x01)
|
|
|
|
return bytes(data)
|
|
|
|
def read_memory_range(self, address: int, size: int, chunk_size: int = 0x40) -> bytes:
|
|
data = bytearray()
|
|
|
|
read_address = address
|
|
read_bytes_remaining = size
|
|
while read_bytes_remaining > 0:
|
|
read_size = min(chunk_size, read_bytes_remaining)
|
|
data += self.read_memory(read_address, read_size)
|
|
|
|
read_bytes_remaining -= read_size
|
|
read_address += read_size
|
|
|
|
return bytes(data)
|
|
|
|
def write_memory_range(self, address: int, data: bytes, chunk_size: int = 0x40):
|
|
write_counter = 0
|
|
while write_counter < len(data):
|
|
write_size = min(chunk_size, len(data) - write_counter)
|
|
write_data = data[write_counter:write_counter + write_size]
|
|
write_memory(port, address + write_counter, write_data)
|
|
|
|
write_counter += write_size
|
|
|
|
def query_firmware_variant(self) -> str:
|
|
# Request
|
|
self._fixed_write(b'PSEARCH')
|
|
self.receive_ack()
|
|
|
|
# Response: firmware variant name
|
|
# Known variants: P13GMRS
|
|
response = self._variable_read(16)
|
|
if not response:
|
|
self._reset()
|
|
raise RuntimeError("Firmware variant query did not receive a response")
|
|
|
|
return response.decode()
|
|
|
|
##
|
|
# Protocol functionality not fully understood yet.
|
|
##
|
|
|
|
def unknown_passsta(protocol):
|
|
protocol._fixed_write(b'PASSSTA')
|
|
response = protocol._fixed_read(3)
|
|
assert response[:1].decode() == 'P'
|
|
assert response[1] == 0x00
|
|
assert response[2] == 0x00
|
|
|
|
|
|
def unknown_sysinfo(protocol):
|
|
protocol._fixed_write(b'SYSINFO')
|
|
protocol.receive_ack()
|
|
|
|
|
|
def common_init(
|
|
protocol,
|
|
query_unknown_passsta = True,
|
|
query_unknown_sysinfo = True
|
|
):
|
|
# querying for use later on when entering programming mode
|
|
# XXX not required to enter read/write mode
|
|
fw_variant = protocol.query_firmware_variant()
|
|
assert fw_variant == 'P13GMRS'
|
|
|
|
# XXX checking whether a password is set?
|
|
# XXX not required to enter programming mode
|
|
if query_unknown_passsta:
|
|
unknown_passsta(protocol)
|
|
|
|
# XXX required to enter programming mode
|
|
unknown_sysinfo(protocol)
|
|
|
|
# XXX some kind of timestamp query or checksum calculation?
|
|
# XXX seems to change based on the contents of radio memory
|
|
# XXX does not seem to change over time on it's own
|
|
# XXX requires sysinfo command to be sent first
|
|
# XXX not required to enter programming mode
|
|
if query_unknown_sysinfo:
|
|
# XXX
|
|
protocol._fixed_write(bytes([0x56, 0x00, 0x00, 0x0A, 0x0D]))
|
|
response = protocol._fixed_read(3)
|
|
assert response == bytes([0x56, 0x0D, 0x0A])
|
|
|
|
response = protocol._fixed_read(10)
|
|
print(response.hex(' '))
|
|
|
|
protocol.send_ack()
|
|
protocol.receive_ack()
|
|
|
|
# XXX
|
|
protocol._fixed_write(bytes([0x56, 0x00, 0x10, 0x0A, 0x0D]))
|
|
response = protocol._fixed_read(3)
|
|
assert response == bytes([0x56, 0x0D, 0x0A])
|
|
|
|
response = protocol._fixed_read(10)
|
|
print(response.hex(' '))
|
|
|
|
protocol.send_ack()
|
|
protocol.receive_ack()
|
|
|
|
# XXX
|
|
protocol._fixed_write(bytes([0x56, 0x00, 0x20, 0x0A, 0x0D]))
|
|
response = protocol._fixed_read(3)
|
|
assert response == bytes([0x56, 0x0D, 0x0A])
|
|
|
|
response = protocol._fixed_read(10)
|
|
print(response.hex(' '))
|
|
|
|
protocol.send_ack()
|
|
protocol.receive_ack()
|
|
|
|
# XXX seems to be different variant then three queries above?
|
|
protocol._fixed_write(bytes([0x56, 0x00, 0x00, 0x00, 0x0A]))
|
|
response = protocol._fixed_read(3)
|
|
print(response.hex(' '))
|
|
|
|
response = protocol._fixed_read(3)
|
|
print(response.hex(' '))
|
|
|
|
response = protocol._fixed_read(5)
|
|
print(response.hex(' '))
|
|
|
|
protocol.send_ack()
|
|
protocol.receive_ack()
|
|
|
|
# XXX: this seems to set a timeout where if no further commands are
|
|
# received within a certain window the radio will reset
|
|
# required to enter programming mode
|
|
protocol._fixed_write(bytes([0xFF, 0xFF, 0xFF, 0xFF, 0x0C]))
|
|
protocol._fixed_write(fw_variant.encode()) # XXX: b'P13GMRS'
|
|
protocol.receive_ack()
|
|
|
|
# XXX required to enter programming mode
|
|
protocol._fixed_write(bytes([0x02]))
|
|
response = protocol._fixed_read(8)
|
|
assert response == bytes([0xFF] * 8)
|
|
|
|
protocol.send_ack()
|
|
protocol.receive_ack()
|
|
|
|
##
|
|
# Console Interface
|
|
##
|
|
|
|
def read(device_path: Path):
|
|
with Protocol.open_port(device_path) as port:
|
|
protocol = Protocol(port)
|
|
common_init(protocol)
|
|
|
|
# XXX no idea what this data is or why it's read
|
|
data = protocol.read_memory_bytes([
|
|
0x1FFF,
|
|
0x2FFF,
|
|
0x3FFF,
|
|
0x4FFF,
|
|
0x5FFF,
|
|
0x6FFF,
|
|
0x7FFF,
|
|
0x8FFF,
|
|
0x9FFF,
|
|
0xAFFF,
|
|
0xBFFF,
|
|
0xCFFF,
|
|
0xDFFF,
|
|
0xEFFF,
|
|
0xFFFF])
|
|
|
|
with open('region_1FFF.bin', 'wb') as output_file:
|
|
output_file.write(data)
|
|
|
|
# XXX data file 0x2000, ???
|
|
data = protocol.read_memory_range(0x1000, 0xFC0)
|
|
with open('region_1000.bin', 'wb') as output_file:
|
|
output_file.write(data)
|
|
|
|
# XXX data file 0x3000 with some differences, ???
|
|
data = protocol.read_memory_range(0xF000, 0xFC0)
|
|
with open('region_F000.bin', 'wb') as output_file:
|
|
output_file.write(data)
|
|
|
|
# XXX data file 0x4000, channel names?
|
|
data = protocol.read_memory_range(0x3000, 0xFC0)
|
|
with open('region_3000.bin', 'wb') as output_file:
|
|
output_file.write(data)
|
|
|
|
# XXX data file 0x5000, general settings?
|
|
data = protocol.read_memory_range(0xB000, 0xFC0)
|
|
with open('region_B000.bin', 'wb') as output_file:
|
|
output_file.write(data)
|
|
|
|
# XXX data file 0x6000, ???
|
|
data = protocol.read_memory_range(0xD000, 0xFC0)
|
|
with open('region_D000.bin', 'wb') as output_file:
|
|
output_file.write(data)
|
|
|
|
|
|
def write(device_path: Path):
|
|
with Protocol.open_port(device_path) as port:
|
|
protocol = Protocol(port)
|
|
common_init(protocol)
|
|
|
|
# XXX no idea what this data is or why it's read
|
|
protocol.read_memory_bytes([
|
|
0x1FFF,
|
|
0x2FFF,
|
|
0x3FFF,
|
|
0x4FFF,
|
|
0x5FFF,
|
|
0x6FFF,
|
|
0x7FFF,
|
|
0x8FFF,
|
|
0x9FFF,
|
|
0xAFFF,
|
|
0xBFFF,
|
|
0xCFFF,
|
|
0xDFFF,
|
|
0xEFFF,
|
|
0xFFFF])
|
|
|
|
# XXX data file 0x3000, ???
|
|
with open('region_7000.bin', 'rb') as input_file:
|
|
data = input_file.read()
|
|
|
|
assert len(data) == 0xFC0
|
|
protocol.write_memory_range(0x7000, data)
|
|
|
|
# XXX data file 0x4000, channel names?
|
|
with open('region_3000.bin', 'rb') as input_file:
|
|
data = input_file.read()
|
|
|
|
assert len(data) == 0xFC0
|
|
protocol.write_memory_range(0x3000, data)
|
|
|
|
# XXX data file offset 0x5000, general settings?
|
|
with open('region_4000.bin', 'rb') as input_file:
|
|
data = input_file.read()
|
|
|
|
assert len(data) == 0xFC0
|
|
protocol.write_memory_range(0x4000, data)
|
|
|
|
# XXX data file offset 0x6000, ???
|
|
with open('region_D000.bin', 'rb') as input_file:
|
|
data = input_file.read()
|
|
|
|
assert len(data) == 0xFC0
|
|
protocol.write_memory_range(0xD000, data)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
'-d', '--device',
|
|
type=Path,
|
|
default='/dev/ttyUSB0')
|
|
|
|
subparsers = parser.add_subparsers()
|
|
|
|
parser_read = subparsers.add_parser('read', help="read radio memory")
|
|
parser_read.set_defaults(run=read)
|
|
|
|
parser_write = subparsers.add_parser('write', help="write radio memory")
|
|
parser_write.set_defaults(run=write)
|
|
|
|
args = parser.parse_args()
|
|
args.run(device_path=args.device)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|