Project

General

Profile

New Model #4129 » thd74.py

Eric Wolak, 08/17/2019 02:45 PM

 
import logging
import struct

import time

from chirp import directory, bitwise, errors, chirp_common, memmap

import thd72
from chirp.util import hexprint

LOG = logging.getLogger(__name__)

# Save files from MCP-D74 have a 256-byte header, and possibly some oddness
# TH-D74 memory map

# 0x02000: memory flags, 4 bytes per memory
# 0x04000: memories, each 40 bytes long
# 0x10000: names, each 16 bytes long, null padded, ascii encoded

# memory channel
# 0 1 2 3 4 5 6 7 8 9 a b c d e f
# [freq ] ? mode tmode/duplex rtone ctone dtcs cross_mode [offset] ?

# frequency is 32-bit unsigned little-endian Hz

# Some of the blocks written by MCP have a length set of less than 0x00/256
BLOCK_SIZES = {
0x0003: 0x00B4,
0x0007: 0x0068,
}

mem_format = """
// TODO: find lockout

#seekto 0x02000;
struct {
// 4 bytes long
u8 disabled;
u8 unk;
u8 group;
u8 unk2;
} flag[1000];

#seekto 0x04000;
// TODO: deal with the 16-byte trailers of every block
struct {
struct {
ul32 freq;
ul32 offset;
u8 tuning_step:4,
unk:4;
u8 mode:4,
unk1:4;
u8 tone_mode:4,
duplex:4;
u8 rtone;
u8 ctone;
u8 dtcs;
u8 cross_mode:4
digital_squelch:4;
char urcall[8];
char rpt1[8];
char rpt2[8];
u8 digital_squelch_code;
} mem[6];
u8 pad[16];
} memory[160]; // TODO: correct number of memories

#seekto 0x10000;
struct {
char name[16];
} channel_name[1000];
"""

STEPS = [5.0, 6.25, None, None, 10.0, 12.5, 15.0, 20.0, 25.0, 50.0, 100.0, 9.0]
MODES = [
"FM",
"DV",
"AM",
"LSB",
"USB",
"CW",
"NFM",
"DV"
]

def hex(data):
data_txt = ""
for idx in range(0, len(data), 16):
bytes = data[idx:idx+16].encode("hex").upper()
for idx in range(0, len(bytes), 2):
data_txt += bytes[idx:idx+2] + " "
data_txt += "\n"
return data_txt.strip()

class SProxy(object):
def __init__(self, delegate):
self.delegate = delegate

def read(self, len):
r = self.delegate.read(len)
LOG.debug("READ\n" + hex(r))
return r

def write(self, data):
LOG.debug("WRITE\n" + hex(data))
return self.delegate.write(data)

@property
def timeout(self):
return self.delegate.timeout

@timeout.setter
def timeout(self, timeout):
self.delegate.timeout = timeout



@directory.register
class THD74Radio(thd72.THD72Radio):
MODEL = "TH-D74 (clone mode)"
_memsize = 500480
# I think baud rate might be ignored by USB-Serial stack of the D74's
# on-board FTDI chip, but it doesn't seem to hurt.
BAUD_RATE = 115200

def __init__(self, pipe):
pipe = SProxy(pipe)
super(THD74Radio, self).__init__(pipe)

def get_features(self):
rf = super(THD74Radio, self).get_features()
rf.has_tuning_step = True
return rf

def process_mmap(self):
self._memobj = bitwise.parse(mem_format, self._mmap)
self._dirty_blocks = []

def sync_in(self):
# self._detect_baud()
self._mmap = self.download()
self.process_mmap()

def sync_out(self):
if len(self._dirty_blocks):
self.upload(self._dirty_blocks)
else:
self.upload()

def read_block(self, block, count=256):
cmd = struct.pack(">cHH", "R", block, count%256)
self.pipe.write(cmd)

r = self.pipe.read(5)
if len(r) != 5:
raise Exception("Did not receive block response")

cmd, _block, _ = struct.unpack(">cHH", r)
if cmd != "W" or _block != block:
raise Exception("Invalid response: %s %i" % (cmd, _block))

data = ""
while len(data) < count:
data += self.pipe.read(count - len(data))

self.pipe.write(chr(0x06))
if self.pipe.read(1) != chr(0x06):
raise Exception("Did not receive post-block ACK!")

return data

def write_block(self, block, map, count=256):
c = struct.pack(">cHH", "W", block, count%256)
base = block * 256
data = map[base:base + count]
# It's crucial that these are written together. Otherwise the radio
# will fail to ACK under some conditions.
self.pipe.write(c + data)

ack = self.pipe.read(1)
return ack == chr(0x06)

def _unlock(self):
"""Voodoo sequence of operations to get the radio to accept our programming."""

h = self.read_block(0, 6)
c = struct.pack(">cHH", "W", 0, 0x30)
self.pipe.write(c)
# magic unlock sequence
unlock = ("\xFF\x01\xFF\x00\x00\xFF\xFF\xFF\xFF\xFF\x01\xFF\xFF\xFF\x03\x01" +
"\x00\x00\x00\x00\x02\x00\x30\x30\x30\x00\xFF\xFF\xFF\xFF\xFF\xFF" +
"\xFF\x00\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF")
self.pipe.write(unlock)

ack = self.pipe.read(1)

if ack != chr(0x06):
raise errors.RadioError("Expected ack but got {} ({})".format(ord(ack), type(ack)))

c = struct.pack(">cHH", "W", 0, 0x38C8)
self.pipe.write(c)
# magic unlock sequence
unlock = [0xFF] * 8 + [0] * 160 + [0xFF] * 32
unlock = "".join([chr(x) for x in unlock])
self.pipe.write(unlock)

time.sleep(0.01)
ack = self.pipe.read(1)

if ack != chr(0x06):
raise errors.RadioError("Expected ack but got {} ({})".format(ord(ack), type(ack)))

def download(self, raw=False, blocks=None):
if blocks is None:
blocks = range(self._memsize / 256)
else:
blocks = [b for b in blocks if b < self._memsize / 256]

if self.command("0M PROGRAM", 2, timeout=1.5) != "0M":
raise errors.RadioError("Radio didn't go into PROGRAM mode")

allblocks = range(self._memsize / 256)
self.pipe.baudrate = 57600
try:
self.pipe.setRTS()
except AttributeError:
self.pipe.rts = True
self.pipe.read(1)
data = ""
LOG.debug("reading blocks %d..%d" % (blocks[0], blocks[-1]))
total = len(blocks)
count = 0
for i in allblocks:
if i not in blocks:
data += 256 * '\xff'
continue
data += self.read_block(i)
count += 1
if self.status_fn:
s = chirp_common.Status()
s.msg = "Cloning from radio"
s.max = total
s.cur = count
self.status_fn(s)

self.pipe.write("E")

if raw:
return data
return memmap.MemoryMap(data)

def upload(self, blocks=None):
# MCP-D74 sets DTR, so we should too
try:
self.pipe.setDTR()
except AttributeError:
self.pipe.dtr = True

if blocks is None:
blocks = range((self._memsize / 256) - 2)
else:
blocks = [b for b in blocks if b < self._memsize / 256]

if self.command("0M PROGRAM", 2, timeout=1.5) != "0M":
raise errors.RadioError("Radio didn't go into PROGRAM mode")

if self._unlock():
raise errors.RadioError("Unlock failed")

# This block definitely isn't written conventionally, so we let _unlock
# handle it and skip.
blocks.remove(0)

# For some reason MCP-D74 skips this block. If we don't, we'll get a NACK
# on the next one.
blocks.remove(1279)

LOG.debug("writing blocks %d..%d" % (blocks[0], blocks[-1]))
total = len(blocks)
count = 0
for i in blocks:
time.sleep(0.001)
r = self.write_block(i, self._mmap, BLOCK_SIZES.get(i, 256))
count += 1
if not r:
raise errors.RadioError("write of block %i failed" % i)
if self.status_fn:
s = chirp_common.Status()
s.msg = "Cloning to radio"
s.max = total
s.cur = count
self.status_fn(s)

self.pipe.write("F")
# clear out blocks we uploaded from the dirty blocks list
self._dirty_blocks = [b for b in self._dirty_blocks if b not in blocks]

def command(self, cmd, response_length, timeout=0.5):
start = time.time()

LOG.debug("PC->D72: %s" % cmd)
default_timeout = self.pipe.timeout
self.pipe.write(cmd + "\r")
self.pipe.timeout = timeout
try:
data = self.pipe.read(response_length + 1)
LOG.debug("D72->PC: %s" % data.strip())
finally:
self.pipe.timeout = default_timeout
return data.strip()

def get_raw_memory(self, number):
bank = number // 6
idx = number % 6

_mem = self._memobj.memory[bank].mem[idx]
return repr(_mem) + \
repr(self._memobj.flag[number])

def get_id(self):
r = self.command("ID", 9)
if r.startswith("ID "):
return r.split(" ")[1]
else:
raise errors.RadioError("No response to ID command")

def get_memory(self, number):
if isinstance(number, str):
try:
number = thd72.THD72_SPECIAL[number]
except KeyError:
raise errors.InvalidMemoryLocation("Unknown channel %s" %
number)

if number < 0 or number > (max(thd72.THD72_SPECIAL.values()) + 1):
raise errors.InvalidMemoryLocation(
"Number must be between 0 and 999")

bank = number // 6
idx = number % 6

_mem = self._memobj.memory[bank].mem[idx]
flag = self._memobj.flag[number]

if MODES[_mem.mode] == "DV":
mem = chirp_common.DVMemory()
else:
mem = chirp_common.Memory()

mem.number = number

if number > 999:
mem.extd_number = thd72.THD72_SPECIAL_REV[number]
if flag.disabled == 0xFF:
mem.empty = True
return mem

mem.name = self.get_channel_name(number)
mem.freq = int(_mem.freq)
mem.tmode = thd72.TMODES[int(_mem.tone_mode)]
mem.rtone = chirp_common.TONES[_mem.rtone]
mem.ctone = chirp_common.TONES[_mem.ctone]
mem.dtcs = chirp_common.DTCS_CODES[_mem.dtcs]
mem.duplex = thd72.DUPLEX[int(_mem.duplex)]
mem.offset = _mem.offset
mem.mode = MODES[int(_mem.mode)]
mem.tuning_step = STEPS[_mem.tuning_step]

if mem.mode == "DV":
mem.dv_urcall = _mem.urcall
mem.dv_rpt1call = _mem.rpt1
mem.dv_rpt2call = _mem.rpt2
mem.dv_code = _mem.digital_squelch_code

if number < 999:
# mem.skip = chirp_common.SKIP_VALUES[int(flag.skip)]
mem.cross_mode = chirp_common.CROSS_MODES[_mem.cross_mode]
if number > 999:
mem.cross_mode = chirp_common.CROSS_MODES[0]
mem.immutable = ["number", "bank", "extd_number", "cross_mode"]
if number >= 1020 and number < 1030:
mem.immutable += ["freq", "offset", "tone", "mode",
"tmode", "ctone", "skip"] # FIXME: ALL
else:
mem.immutable += ["name"]

return mem

def set_memory(self, mem):
LOG.debug("set_memory(%d)" % mem.number)
if mem.number < 0 or mem.number > (max(thd72.THD72_SPECIAL.values()) + 1):
raise errors.InvalidMemoryLocation(
"Number must be between 0 and 999")

# weather channels can only change name, nothing else
if mem.number >= 1020 and mem.number < 1030:
self.set_channel_name(mem.number, mem.name)
return

flag = self._memobj.flag[mem.number]
self.add_dirty_block(self._memobj.flag[mem.number])

# only delete non-WX channels
was_empty = flag.disabled == 0xf
if mem.empty:
flag.disabled = 0xf
return
flag.disabled = 0

_mem = self._memobj.memory[mem.number]
self.add_dirty_block(_mem)
if was_empty:
self.initialize(_mem)

_mem.freq = mem.freq

if mem.number < 999:
self.set_channel_name(mem.number, mem.name)

_mem.tone_mode = thd72.TMODES_REV[mem.tmode]
_mem.rtone = chirp_common.TONES.index(mem.rtone)
_mem.ctone = chirp_common.TONES.index(mem.ctone)
_mem.dtcs = chirp_common.DTCS_CODES.index(mem.dtcs)
_mem.cross_mode = chirp_common.CROSS_MODES.index(mem.cross_mode)
_mem.duplex = thd72.DUPLEX_REV[mem.duplex]
_mem.offset = mem.offset
_mem.mode = thd72.MODES_REV[mem.mode]

prog_vfo = thd72.get_prog_vfo(mem.freq)
flag.prog_vfo = prog_vfo
_mem.unknown1 = _mem.unknown2 = thd72.UNKNOWN_LOOKUP[prog_vfo]

if mem.number < 999:
flag.skip = chirp_common.SKIP_VALUES.index(mem.skip)
(5-5/10)