Project

General

Profile

New Model #4933 » bf-t1 (hwh).py

Hank's modified version of Pavel's driver. - Harold Hankins, 12/11/2017 03:54 PM

 
# Copyright 2017 Pavel Milanes, CO7WT, <pavelmc@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import time
import struct
import logging

LOG = logging.getLogger(__name__)

from time import sleep
from chirp import chirp_common, directory, memmap
from chirp import bitwise, errors, util
from textwrap import dedent

# A note about the memmory in these radios
# mainly speculation until proven otherwise:
#
# The '9100' OEM software only manipulates the lower 0x180 bytes on read/write
# operations as we know, the file generated by the OEM software IN NOT an exact
# eeprom image, it's a crude text file with a pseudo csv format

MEM_SIZE = 0x180 # 384 bytes
BLOCK_SIZE = 0x10
ACK_CMD = "\x06"
MODES = ["FM", "NFM"]
SKIP_VALUES = ["S", ""]

# This is a general serial timeout for all serial read functions.
# Practice has show that about 0.7 sec will be enough to cover all radios.
STIMEOUT = 1

# this var controls the verbosity in the debug and by default it's low (False)
# make it True and you will to get a very verbose debug.log
debug = True

##### ID strings #####################################################

# BF-T1 handheld
BFT1_magic = "\x05PROGRAM"
BFT1_ident = "\x20\x42\x46\x39\x31\x30\x30\x53" # " BF9100S"


def _clean_buffer(radio):
"""Cleaning the read serial buffer, hard timeout to survive an infinite
data stream"""

# touching the serial timeout to optimize the flushing
# restored at the end to the default value
radio.pipe.timeout = 0.1
dump = "1"
datacount = 0

try:
while len(dump) > 0:
dump = radio.pipe.read(100)
datacount += len(dump)
# hard limit to survive a infinite serial data stream
# 5 times bigger than a normal rx block (20 bytes)
if datacount > 101:
seriale = "Please check your serial port selection."
raise errors.RadioError(seriale)

# restore the default serial timeout
radio.pipe.timeout = STIMEOUT

except Exception:
raise errors.RadioError("Unknown error cleaning the serial buffer")


def _rawrecv(radio, amount = 0):
"""Raw read from the radio device"""

# var to hold the data to return
data = ""

try:
if amount == 0:
data = radio.pipe.read()
else:
data = radio.pipe.read(amount)

# DEBUG
if debug is True:
LOG.debug("<== (%d) bytes:\n\n%s" %
(len(data), util.hexprint(data)))

# fail if no data is received
if len(data) == 0:
raise errors.RadioError("No data received from radio")

except:
raise errors.RadioError("Error reading data from radio")

return data


def _send(radio, data):
"""Send data to the radio device"""

try:
radio.pipe.write(data)

# DEBUG
if debug is True:
LOG.debug("==> (%d) bytes:\n\n%s" %
(len(data), util.hexprint(data)))
except:
raise errors.RadioError("Error sending data to radio")


def _make_frame(cmd, addr, data=""):
"""Pack the info in the header format"""
frame = struct.pack(">BHB", ord(cmd), addr, BLOCK_SIZE)

# add the data if set
if len(data) != 0:
frame += data

return frame


def _recv(radio, addr):
"""Get data from the radio"""

# Get the full 20 bytes at a time
# 4 bytes header + 16 bytes of data (BLOCK_SIZE)

# get the whole block
block = _rawrecv(radio, BLOCK_SIZE + 4)

# short answer
if len(block) < (BLOCK_SIZE + 4):
raise errors.RadioError("Wrong block length (short) at 0x%04x" % addr)

# long answer
if len(block) > (BLOCK_SIZE + 4):
raise errors.RadioError("Wrong block length (long) at 0x%04x" % addr)


# header validation
c, a, l = struct.unpack(">cHB", block[0:4])
if c != "W" or a != addr or l != BLOCK_SIZE:
LOG.debug("Invalid header for block 0x%04x:" % addr)
LOG.debug("CMD: %s ADDR: %04x SIZE: %02x" % (c, a, l))
raise errors.RadioError("Invalid header for block 0x%04x:" % addr)

# return the data, 16 bytes of payload
return block[4:]


def _start_clone_mode(radio, status):
"""Put the radio in clone mode, 3 tries"""

# cleaning the serial buffer
_clean_buffer(radio)

# prep the data to show in the UI
status.cur = 0
status.msg = "Identifying the radio..."
status.max = 3
radio.status_fn(status)

try:
for a in range(0, status.max):
# Update the UI
status.cur = a + 1
radio.status_fn(status)

# send the magic word
_send(radio, radio._magic)

# Now you get a x06 of ACK if all goes well
ack = _rawrecv(radio, 1)

if ack == ACK_CMD:
# DEBUG
LOG.info("Magic ACK received")
status.cur = status.max
radio.status_fn(status)

return True

return False

except errors.RadioError:
raise
except Exception, e:
raise errors.RadioError("Error sending Magic to radio:\n%s" % e)


def _do_ident(radio, status):
"""Put the radio in PROGRAM mode & identify it"""
# set the serial discipline (default)
radio.pipe.baudrate = 9600
radio.pipe.parity = "N"
radio.pipe.bytesize = 8
radio.pipe.stopbits = 1
radio.pipe.timeout = STIMEOUT
radio.pipe.flush()

# open the radio into program mode
if _start_clone_mode(radio, status) is False:
raise errors.RadioError("Radio did not enter clone mode, wrong model?")

# Ok, poke it to get the ident string
_send(radio, "\x02")
ident = _rawrecv(radio, len(radio._id))

# basic check for the ident
if len(ident) != len(radio._id):
raise errors.RadioError("Radio send a odd identification block.")

# check if ident is OK
if ident != radio._id:
LOG.debug("Incorrect model ID, got this:\n\n" + util.hexprint(ident))
raise errors.RadioError("Radio identification failed.")

# handshake
_send(radio, ACK_CMD)
ack = _rawrecv(radio, 1)

#checking handshake
if len(ack) == 1 and ack == ACK_CMD:
# DEBUG
LOG.info("ID ACK received")
else:
LOG.debug("Radio handshake failed.")
raise errors.RadioError("Radio handshake failed.")

# DEBUG
LOG.info("Positive ident, this is a %s %s" % (radio.VENDOR, radio.MODEL))

return True


def _download(radio):
"""Get the memory map"""

# UI progress
status = chirp_common.Status()

# put radio in program mode and identify it
_do_ident(radio, status)

# reset the progress bar in the UI
status.max = MEM_SIZE / BLOCK_SIZE
status.msg = "Cloning from radio..."
status.cur = 0
radio.status_fn(status)

# cleaning the serial buffer
_clean_buffer(radio)

# increasing the timeout in the "discovery" process.
radio.pipe.timeout = 3 # 3 seconds.

data = ""
for addr in range(0, MEM_SIZE, BLOCK_SIZE):
# sending the read request
_send(radio, _make_frame("R", addr))

# read
d = _recv(radio, addr)

# aggregate the data
data += d

# UI Update
status.cur = addr / BLOCK_SIZE
status.msg = "Cloning from radio..."
radio.status_fn(status)

return data


def _upload(radio):
"""Upload procedure"""

# UI progress
status = chirp_common.Status()

# put radio in program mode and identify it
_do_ident(radio, status, True)

# get the data to upload to radio
data = radio.get_mmap()

# Reset the UI progress
status.max = MEM_SIZE / BLOCK_SIZE
status.cur = 0
status.msg = "Cloning to radio..."
radio.status_fn(status)

# cleaning the serial buffer
_clean_buffer(radio)

# the fun start here
for addr in range(0, MEM_SIZE, BLOCK_SIZE):
# getting the block of data to send
d = data[addr:addr + BLOCK_SIZE]

# build the frame to send
frame = _make_frame("W", addr, BLOCK_SIZE, d)

# send the frame
_send(radio, frame)

# receiving the response
ack = _rawrecv(radio, 1)

# basic check
if len(ack) != 1:
raise errors.RadioError("No ACK when writing block 0x%04x" % addr)

if ack != ACK_CMD:
raise errors.RadioError("Bad ACK writing block 0x%04x:" % addr)

# UI Update
status.cur = addr / TX_BLOCK_SIZE
status.msg = "Cloning to radio..."
radio.status_fn(status)


def _split(rf, f1, f2):
"""Returns False if the two freqs are in the same band (no split)
or True otherwise"""

# determine if the two freqs are in the same band
for low, high in rf.valid_bands:
if f1 >= low and f1 <= high and f2 >= low and f2 <= high:
# if the two freqs are on the same Band this is not a split
return False

# if you get here is because the freq pairs are split
return True


#~ def model_match(cls, data):
#~ """Match the opened/downloaded image to the correct version"""
#~ # by now just size match

#~ return False


MEM_FORMAT = """
#seekto 0x0000;
struct {
u8 unknown0;
lbcd rxfreq[3];
u8 unknown1[4];
lbcd txfreq[3];
u8 unknown2[5];
} memory[20];
"""


@directory.register
class BFT1(chirp_common.CloneModeRadio, chirp_common.ExperimentalRadio):
"""Baofeng BT-F1 radio & possibly alike radios"""
VENDOR = "Baofeng"
MODEL = "BF-T1"
_power_levels = [chirp_common.PowerLevel("High", watts=5),
chirp_common.PowerLevel("Low", watts=1)]
_vhf_range = (136000000, 174000000)
_uhf_range = (400000000, 470000000)
_upper = 19
_magic = BFT1_magic
_id = BFT1_ident

@classmethod
def get_prompts(cls):
rp = chirp_common.RadioPrompts()
rp.experimental = \
('This driver is experimental.\n'
'\n'
'Please keep a copy of your memories with the original software '
'if you treasure them, this driver is new and may contain'
' bugs.\n'
'\n'
)
rp.pre_download = _(dedent("""\
Follow these instructions to download your info:

1 - Turn off your radio
2 - Connect your interface cable
3 - Turn on your radio
4 - Do the download of your radio data

"""))
rp.pre_upload = _(dedent("""\
Follow these instructions to upload your info:

1 - Turn off your radio
2 - Connect your interface cable
3 - Turn on your radio
4 - Do the upload of your radio data

"""))
return rp

def get_features(self):
"""Get the radio's features"""

# we will use the following var as global
global POWER_LEVELS

rf = chirp_common.RadioFeatures()
#~ rf.has_settings = True
#~ rf.has_bank = False
#~ rf.has_tuning_step = False
#~ rf.can_odd_split = True
#~ rf.has_name = True
rf.has_offset = True
rf.has_mode = True
rf.valid_modes = MODES
#~ rf.has_dtcs = True
#~ rf.has_rx_dtcs = True
#~ rf.has_dtcs_polarity = True
#~ rf.has_ctone = True
#~ rf.has_cross = True
#~ rf.valid_characters = VALID_CHARS
#~ rf.valid_name_length = self.NAME_LENGTH
rf.valid_duplexes = ["", "-", "+", "split", "off"]
#~ rf.valid_tmodes = ['', 'Tone', 'TSQL', 'DTCS', 'Cross']
#~ rf.valid_cross_modes = [
#~ "Tone->Tone",
#~ "DTCS->",
#~ "->DTCS",
#~ "Tone->DTCS",
#~ "DTCS->Tone",
#~ "->Tone",
#~ "DTCS->DTCS"]
rf.valid_skips = SKIP_VALUES
#~ rf.valid_dtcs_codes = DTCS
rf.memory_bounds = (0, self._upper)

# power levels
POWER_LEVELS = self._power_levels
rf.valid_power_levels = POWER_LEVELS

# normal dual bands
rf.valid_bands = [self._vhf_range, self._uhf_range]

return rf

def process_mmap(self):
"""Process the mem map into the mem object"""

# Get it
self._memobj = bitwise.parse(MEM_FORMAT, self._mmap)

def sync_in(self):
"""Download from radio"""
data = _download(self)
self._mmap = memmap.MemoryMap(data)
self.process_mmap()

def sync_out(self):
"""Upload to radio"""

#~ try:
#~ _upload(self)
#~ except errors.RadioError:
#~ raise
#~ except Exception, e:
#~ raise errors.RadioError("Error: %s" % e)

# upload disabled by now
raise errors.RadioError("Error: This is a dev driver, no upload yet.")

def get_raw_memory(self, number):
return repr(self._memobj.memory[number])

def get_memory(self, number):
"""Get the mem representation from the radio image"""
_mem = self._memobj.memory[number]

# Create a high-level memory object to return to the UI
mem = chirp_common.Memory()

# Memory number
mem.number = number

if _mem.get_raw()[0] == "\xFF":
mem.empty = True
return mem

# Freq and offset
mem.freq = int(_mem.rxfreq) * 1000
# tx freq can be blank
if _mem.get_raw()[8] == "\xFF":
# TX freq not set
mem.offset = 0
mem.duplex = "off"
else:
# TX freq set
txfreq = int(_mem.txfreq) * 1000
offset = txfreq - mem.freq
if offset != 0:
if _split(self.get_features(), mem.freq, txfreq):
mem.duplex = "split"
mem.offset = txfreq
elif offset < 0:
mem.offset = abs(offset)
mem.duplex = "-"
elif offset > 0:
mem.offset = offset
mem.duplex = "+"
else:
mem.offset = 0

#~ # power
#~ mem.power = POWER_LEVELS[int(_mem.power)]

#~ # wide/narrow
#~ mem.mode = MODES[int(_mem.wide)]

#~ # skip
#~ mem.skip = SKIP_VALUES[_mem.add]

#~ # tone data
#~ rxtone = txtone = None
#~ txtone = self._decode_tone(_mem.txtone)
#~ rxtone = self._decode_tone(_mem.rxtone)
#~ chirp_common.split_tone_decode(mem, txtone, rxtone)


return mem

def set_memory(self, mem):
"""Set the memory data in the eeprom img from the UI"""
# get the eprom representation of this channel
_mem = self._memobj.memory[mem.number]
_names = self._memobj.names[mem.number]

# if empty memmory
if mem.empty:
# the channel itself
_mem.set_raw("\xFF" * 16)

# frequency
_mem.rxfreq = mem.freq / 1000

# duplex
if mem.duplex == "+":
_mem.txfreq = (mem.freq + mem.offset) / 1000
elif mem.duplex == "-":
_mem.txfreq = (mem.freq - mem.offset) / 1000
elif mem.duplex == "off":
for i in _mem.txfreq:
i.set_raw("\xFF")
elif mem.duplex == "split":
_mem.txfreq = mem.offset / 1000
else:
_mem.txfreq = mem.freq / 1000

#~ # tone data
#~ ((txmode, txtone, txpol), (rxmode, rxtone, rxpol)) = \
#~ chirp_common.split_tone_encode(mem)
#~ self._encode_tone(_mem.txtone, txmode, txtone, txpol)
#~ self._encode_tone(_mem.rxtone, rxmode, rxtone, rxpol)

return mem

@classmethod
def match_model(cls, filedata, filename):
match_size = False
#~ match_model = False

LOG.debug("len file/mem %i/%i" % (len(filedata), MEM_SIZE))

# testing the file data size
if len(filedata) == MEM_SIZE:
match_size = True

# DEBUG
if debug is True:
LOG.debug("BF-T1 matched!")


# testing the firmware model fingerprint
#~ match_model = model_match(cls, filedata)

if match_size: # and match_model:
return True
else:
return False
(22-22/77)