1
|
# Copyright 2008 Dan Smith <dsmith@danplanet.com>
|
2
|
#
|
3
|
# This program is free software: you can redistribute it and/or modify
|
4
|
# it under the terms of the GNU General Public License as published by
|
5
|
# the Free Software Foundation, either version 3 of the License, or
|
6
|
# (at your option) any later version.
|
7
|
#
|
8
|
# This program is distributed in the hope that it will be useful,
|
9
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
10
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
11
|
# GNU General Public License for more details.
|
12
|
#
|
13
|
# You should have received a copy of the GNU General Public License
|
14
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
15
|
|
16
|
import os
|
17
|
import csv
|
18
|
import logging
|
19
|
|
20
|
from chirp import chirp_common, errors, directory
|
21
|
|
22
|
LOG = logging.getLogger(__name__)
|
23
|
DEFAULT_POWER_LEVEL = chirp_common.AutoNamedPowerLevel(50)
|
24
|
|
25
|
|
26
|
class OmittedHeaderError(Exception):
|
27
|
"""Internal exception to signal that a column has been omitted"""
|
28
|
pass
|
29
|
|
30
|
|
31
|
def get_datum_by_header(headers, data, header):
|
32
|
"""Return the column corresponding to @headers[@header] from @data"""
|
33
|
if header not in headers:
|
34
|
raise OmittedHeaderError("Header %s not provided" % header)
|
35
|
|
36
|
try:
|
37
|
return data[headers.index(header)]
|
38
|
except IndexError:
|
39
|
raise OmittedHeaderError("Header %s not provided on this line" %
|
40
|
header)
|
41
|
|
42
|
|
43
|
def write_memory(writer, mem):
|
44
|
"""Write @mem using @writer if not empty"""
|
45
|
if mem.empty:
|
46
|
return
|
47
|
writer.writerow(mem.to_csv())
|
48
|
|
49
|
|
50
|
def parse_cross_mode(value):
|
51
|
if value not in chirp_common.CROSS_MODES:
|
52
|
raise ValueError('Invalid cross mode %r' % value)
|
53
|
return value
|
54
|
|
55
|
|
56
|
@directory.register
|
57
|
class CSVRadio(chirp_common.FileBackedRadio):
|
58
|
"""A driver for Generic CSV files"""
|
59
|
VENDOR = "Generic"
|
60
|
MODEL = "CSV"
|
61
|
FILE_EXTENSION = "csv"
|
62
|
FORMATS = [directory.register_format('CSV', '*.csv')]
|
63
|
|
64
|
ATTR_MAP = {
|
65
|
"Location": (int, "number"),
|
66
|
"Name": (str, "name"),
|
67
|
"Frequency": (chirp_common.parse_freq, "freq"),
|
68
|
"Duplex": (str, "duplex"),
|
69
|
"Offset": (chirp_common.parse_freq, "offset"),
|
70
|
"Tone": (str, "tmode"),
|
71
|
"rToneFreq": (float, "rtone"),
|
72
|
"cToneFreq": (float, "ctone"),
|
73
|
"DtcsCode": (int, "dtcs"),
|
74
|
"DtcsPolarity": (str, "dtcs_polarity"),
|
75
|
"RxDtcsCode": (int, "rx_dtcs"),
|
76
|
"CrossMode": (parse_cross_mode, "cross_mode"),
|
77
|
"Mode": (str, "mode"),
|
78
|
"TStep": (float, "tuning_step"),
|
79
|
"Skip": (str, "skip"),
|
80
|
"Power": (chirp_common.parse_power, "power"),
|
81
|
"Comment": (str, "comment"),
|
82
|
}
|
83
|
|
84
|
def _blank(self, setDefault=False):
|
85
|
self.errors = []
|
86
|
self.memories = [chirp_common.Memory(i, True) for i in range(0, 1000)]
|
87
|
if (setDefault):
|
88
|
self.memories[0].empty = False
|
89
|
self.memories[0].freq = 146010000
|
90
|
# Default to 50W
|
91
|
self.memories[0].power = DEFAULT_POWER_LEVEL
|
92
|
|
93
|
def __init__(self, pipe):
|
94
|
chirp_common.FileBackedRadio.__init__(self, None)
|
95
|
self.memories = []
|
96
|
self.file_has_rTone = None # Set in load(), used in _clean_tmode()
|
97
|
self.file_has_cTone = None
|
98
|
|
99
|
# Persistence for comment lines
|
100
|
# List of tuples of (previous_memory, comment)
|
101
|
self._comments = []
|
102
|
|
103
|
self._filename = pipe
|
104
|
if self._filename and os.path.exists(self._filename):
|
105
|
self.load()
|
106
|
else:
|
107
|
self._blank(True)
|
108
|
|
109
|
def get_features(self):
|
110
|
rf = chirp_common.RadioFeatures()
|
111
|
rf.has_bank = False
|
112
|
rf.requires_call_lists = False
|
113
|
rf.has_implicit_calls = False
|
114
|
rf.memory_bounds = (0, len(self.memories)-1)
|
115
|
rf.has_infinite_number = True
|
116
|
rf.has_nostep_tuning = True
|
117
|
rf.has_comment = True
|
118
|
rf.has_rx_dtcs = True
|
119
|
rf.has_variable_power = True
|
120
|
rf.can_odd_split = True
|
121
|
|
122
|
rf.valid_modes = list(chirp_common.MODES)
|
123
|
rf.valid_tmodes = list(chirp_common.TONE_MODES)
|
124
|
rf.valid_cross_modes = list(chirp_common.CROSS_MODES)
|
125
|
rf.valid_duplexes = ["", "-", "+", "split", "off"]
|
126
|
rf.valid_tuning_steps = list(chirp_common.TUNING_STEPS)
|
127
|
rf.valid_bands = [(1, 10000000000)]
|
128
|
rf.valid_skips = ["", "S"]
|
129
|
rf.valid_characters = chirp_common.CHARSET_1252
|
130
|
rf.valid_name_length = 999
|
131
|
rf.valid_power_levels = [chirp_common.AutoNamedPowerLevel(0.1),
|
132
|
DEFAULT_POWER_LEVEL,
|
133
|
chirp_common.AutoNamedPowerLevel(1500)]
|
134
|
|
135
|
return rf
|
136
|
|
137
|
def _clean(self, headers, line, mem):
|
138
|
"""Runs post-processing functions on new mem objects.
|
139
|
|
140
|
This is useful for parsing other CSV dialects when multiple columns
|
141
|
convert to a single Chirp column."""
|
142
|
|
143
|
for attr in dir(mem):
|
144
|
fname = "_clean_%s" % attr
|
145
|
if hasattr(self, fname):
|
146
|
mem = getattr(self, fname)(headers, line, mem)
|
147
|
|
148
|
return mem
|
149
|
|
150
|
def _clean_tmode(self, headers, line, mem):
|
151
|
""" If there is exactly one of [rToneFreq, cToneFreq] columns in the
|
152
|
csv file, use it for both rtone & ctone. Makes TSQL use friendlier."""
|
153
|
|
154
|
if self.file_has_rTone and not self.file_has_cTone:
|
155
|
mem.ctone = mem.rtone
|
156
|
elif self.file_has_cTone and not self.file_has_rTone:
|
157
|
mem.rtone = mem.ctone
|
158
|
|
159
|
return mem
|
160
|
|
161
|
def _parse_csv_data_line(self, headers, line):
|
162
|
mem = chirp_common.Memory()
|
163
|
try:
|
164
|
if get_datum_by_header(headers, line, "Mode") == "DV":
|
165
|
mem = chirp_common.DVMemory()
|
166
|
except OmittedHeaderError:
|
167
|
pass
|
168
|
|
169
|
for header in headers:
|
170
|
try:
|
171
|
typ, attr = self.ATTR_MAP[header]
|
172
|
except KeyError:
|
173
|
continue
|
174
|
try:
|
175
|
val = get_datum_by_header(headers, line, header)
|
176
|
if not val and typ == int:
|
177
|
val = None
|
178
|
else:
|
179
|
val = typ(val)
|
180
|
if hasattr(mem, attr):
|
181
|
setattr(mem, attr, val)
|
182
|
except OmittedHeaderError:
|
183
|
pass
|
184
|
except Exception as e:
|
185
|
raise Exception("[%s] %s" % (attr, e))
|
186
|
|
187
|
if not mem.power:
|
188
|
# Default power level to something if not set
|
189
|
mem.power = DEFAULT_POWER_LEVEL
|
190
|
|
191
|
return self._clean(headers, line, mem)
|
192
|
|
193
|
def load(self, filename=None):
|
194
|
if filename is None and self._filename is None:
|
195
|
raise errors.RadioError("Need a location to load from")
|
196
|
|
197
|
if filename:
|
198
|
self._filename = filename
|
199
|
|
200
|
self._blank()
|
201
|
|
202
|
with open(self._filename, newline='', encoding='utf-8-sig') as f:
|
203
|
return self._load(f)
|
204
|
|
205
|
def _load(self, f):
|
206
|
reader = csv.reader(f, delimiter=chirp_common.SEPCHAR, quotechar='"')
|
207
|
|
208
|
self._comments = []
|
209
|
good = 0
|
210
|
lineno = 0
|
211
|
last_number = -1
|
212
|
for line in reader:
|
213
|
# Skip (but stash) comment lines that start with #
|
214
|
if line and line[0].startswith('#'):
|
215
|
self._comments.append((last_number, ' '.join(line)))
|
216
|
continue
|
217
|
lineno += 1
|
218
|
if lineno == 1:
|
219
|
header = line
|
220
|
self.file_has_rTone = "rToneFreq" in header
|
221
|
self.file_has_cTone = "cToneFreq" in header
|
222
|
continue
|
223
|
|
224
|
if len(header) > len(line):
|
225
|
LOG.error("Line %i has %i columns, expected %i",
|
226
|
lineno, len(line), len(header))
|
227
|
self.errors.append("Column number mismatch on line %i" %
|
228
|
lineno)
|
229
|
continue
|
230
|
|
231
|
try:
|
232
|
mem = self._parse_csv_data_line(header, line)
|
233
|
if mem.number is None:
|
234
|
raise Exception("Invalid Location field" % lineno)
|
235
|
except Exception as e:
|
236
|
LOG.error("Line %i: %s", lineno, e)
|
237
|
self.errors.append("Line %i: %s" % (lineno, e))
|
238
|
continue
|
239
|
|
240
|
last_number = mem.number
|
241
|
self._grow(mem.number)
|
242
|
self.memories[mem.number] = mem
|
243
|
good += 1
|
244
|
|
245
|
if not good:
|
246
|
raise errors.InvalidDataError("No channels found")
|
247
|
|
248
|
def save(self, filename=None):
|
249
|
if filename is None and self._filename is None:
|
250
|
raise errors.RadioError("Need a location to save to")
|
251
|
|
252
|
if filename:
|
253
|
self._filename = filename
|
254
|
|
255
|
with open(self._filename, "w", newline='', encoding='utf-8') as f:
|
256
|
comments = list(self._comments)
|
257
|
writer = csv.writer(f, delimiter=chirp_common.SEPCHAR)
|
258
|
|
259
|
for index, comment in comments[:]:
|
260
|
if index >= 0:
|
261
|
break
|
262
|
writer.writerow([comment])
|
263
|
comments.pop(0)
|
264
|
|
265
|
writer.writerow(chirp_common.Memory.CSV_FORMAT)
|
266
|
|
267
|
for mem in self.memories:
|
268
|
for index, comment in comments[:]:
|
269
|
if index >= mem.number:
|
270
|
break
|
271
|
writer.writerow([comment])
|
272
|
comments.pop(0)
|
273
|
write_memory(writer, mem)
|
274
|
|
275
|
# MMAP compatibility
|
276
|
def save_mmap(self, filename):
|
277
|
return self.save(filename)
|
278
|
|
279
|
def load_mmap(self, filename):
|
280
|
return self.load(filename)
|
281
|
|
282
|
def get_memories(self, lo=0, hi=999):
|
283
|
return [x for x in self.memories if x.number >= lo and x.number <= hi]
|
284
|
|
285
|
def get_memory(self, number):
|
286
|
try:
|
287
|
return self.memories[number].dupe()
|
288
|
except:
|
289
|
raise errors.InvalidMemoryLocation("No such memory %s" % number)
|
290
|
|
291
|
def _grow(self, target):
|
292
|
delta = target - len(self.memories)
|
293
|
if delta < 0:
|
294
|
return
|
295
|
|
296
|
delta += 1
|
297
|
|
298
|
for i in range(len(self.memories), len(self.memories) + delta + 1):
|
299
|
mem = chirp_common.Memory()
|
300
|
mem.empty = True
|
301
|
mem.number = i
|
302
|
self.memories.append(mem)
|
303
|
|
304
|
def set_memory(self, newmem):
|
305
|
newmem = newmem.dupe()
|
306
|
if newmem.power is None:
|
307
|
newmem.power = DEFAULT_POWER_LEVEL
|
308
|
else:
|
309
|
# Accept any power level because we are CSV, but convert it to
|
310
|
# the class that will str() into our desired format.
|
311
|
newmem.power = chirp_common.AutoNamedPowerLevel(
|
312
|
chirp_common.dBm_to_watts(float(newmem.power)))
|
313
|
self._grow(newmem.number)
|
314
|
self.memories[newmem.number] = newmem
|
315
|
self.memories[newmem.number].name = newmem.name.rstrip()
|
316
|
|
317
|
def erase_memory(self, number):
|
318
|
mem = chirp_common.Memory()
|
319
|
mem.number = number
|
320
|
mem.empty = True
|
321
|
self.memories[number] = mem
|
322
|
|
323
|
def get_raw_memory(self, number):
|
324
|
return ",".join(chirp_common.Memory.CSV_FORMAT) + \
|
325
|
os.linesep + \
|
326
|
",".join(self.memories[number].to_csv())
|
327
|
|
328
|
@classmethod
|
329
|
def match_model(cls, filedata, filename):
|
330
|
"""Match files ending in .CSV"""
|
331
|
try:
|
332
|
filedata = filedata.decode()
|
333
|
except UnicodeDecodeError:
|
334
|
# CSV files are text
|
335
|
return False
|
336
|
return filename.lower().endswith("." + cls.FILE_EXTENSION) and \
|
337
|
(find_csv_header(filedata) or filedata == "")
|
338
|
|
339
|
|
340
|
def find_csv_header(filedata):
|
341
|
if filedata.startswith('\ufeff') or filedata.startswith('\ufffe'):
|
342
|
# Skip BOM
|
343
|
filedata = filedata[1:]
|
344
|
while filedata.startswith('#'):
|
345
|
filedata = filedata[filedata.find('\n') + 1:]
|
346
|
return filedata.startswith('Location,')
|
347
|
|
348
|
|
349
|
@directory.register
|
350
|
class CommanderCSVRadio(CSVRadio):
|
351
|
"""A driver for reading CSV files generated by KG-UV Commander software"""
|
352
|
VENDOR = "Commander"
|
353
|
MODEL = "KG-UV"
|
354
|
FILE_EXTENSION = "csv"
|
355
|
|
356
|
MODE_MAP = {
|
357
|
"NARR": "NFM",
|
358
|
"WIDE": "FM",
|
359
|
}
|
360
|
|
361
|
SCAN_MAP = {
|
362
|
"ON": "",
|
363
|
"OFF": "S"
|
364
|
}
|
365
|
|
366
|
ATTR_MAP = {
|
367
|
"#": (int, "number"),
|
368
|
"Name": (str, "name"),
|
369
|
"RX Freq": (chirp_common.parse_freq, "freq"),
|
370
|
"Scan": (lambda v: CommanderCSVRadio.SCAN_MAP.get(v), "skip"),
|
371
|
"TX Dev": (lambda v: CommanderCSVRadio.MODE_MAP.get(v), "mode"),
|
372
|
"Group/Notes": (str, "comment"),
|
373
|
}
|
374
|
|
375
|
def _clean_number(self, headers, line, mem):
|
376
|
if mem.number == 0:
|
377
|
for memory in self.memories:
|
378
|
if memory.empty:
|
379
|
mem.number = memory.number
|
380
|
break
|
381
|
return mem
|
382
|
|
383
|
def _clean_duplex(self, headers, line, mem):
|
384
|
try:
|
385
|
txfreq = chirp_common.parse_freq(
|
386
|
get_datum_by_header(headers, line, "TX Freq"))
|
387
|
except ValueError:
|
388
|
mem.duplex = "off"
|
389
|
return mem
|
390
|
|
391
|
if mem.freq == txfreq:
|
392
|
mem.duplex = ""
|
393
|
elif txfreq:
|
394
|
mem.duplex = "split"
|
395
|
mem.offset = txfreq
|
396
|
|
397
|
return mem
|
398
|
|
399
|
def _clean_tmode(self, headers, line, mem):
|
400
|
rtone = get_datum_by_header(headers, line, "Encode")
|
401
|
ctone = get_datum_by_header(headers, line, "Decode")
|
402
|
if rtone == "OFF":
|
403
|
rtone = None
|
404
|
else:
|
405
|
rtone = float(rtone)
|
406
|
|
407
|
if ctone == "OFF":
|
408
|
ctone = None
|
409
|
else:
|
410
|
ctone = float(ctone)
|
411
|
|
412
|
if rtone:
|
413
|
mem.tmode = "Tone"
|
414
|
if ctone:
|
415
|
mem.tmode = "TSQL"
|
416
|
|
417
|
mem.rtone = rtone or 88.5
|
418
|
mem.ctone = ctone or mem.rtone
|
419
|
|
420
|
return mem
|
421
|
|
422
|
@classmethod
|
423
|
def match_model(cls, filedata, filename):
|
424
|
"""Match files ending in .csv and using Commander column names."""
|
425
|
return filename.lower().endswith("." + cls.FILE_EXTENSION) and \
|
426
|
filedata.startswith(b"Name,RX Freq,TX Freq,Decode,Encode,TX Pwr,"
|
427
|
b"Scan,TX Dev,Busy Lck,Group/Notes") or \
|
428
|
filedata.startswith(b'"#","Name","RX Freq","TX Freq","Decode",'
|
429
|
b'"Encode","TX Pwr","Scan","TX Dev",'
|
430
|
b'"Busy Lck","Group/Notes"')
|
431
|
|
432
|
|
433
|
@directory.register
|
434
|
class RTCSVRadio(CSVRadio):
|
435
|
"""A driver for reading CSV files generated by RT Systems software"""
|
436
|
VENDOR = "RT Systems"
|
437
|
MODEL = "CSV"
|
438
|
FILE_EXTENSION = "csv"
|
439
|
|
440
|
DUPLEX_MAP = {
|
441
|
"Minus": "-",
|
442
|
"Plus": "+",
|
443
|
"Simplex": "",
|
444
|
"Split": "split",
|
445
|
}
|
446
|
|
447
|
SKIP_MAP = {
|
448
|
"Off": "",
|
449
|
"On": "S",
|
450
|
"P Scan": "P",
|
451
|
"Skip": "S",
|
452
|
}
|
453
|
|
454
|
TMODE_MAP = {
|
455
|
"None": "",
|
456
|
"T Sql": "TSQL",
|
457
|
}
|
458
|
|
459
|
BOOL_MAP = {
|
460
|
"Off": False,
|
461
|
"On": True,
|
462
|
}
|
463
|
|
464
|
ATTR_MAP = {
|
465
|
"Channel Number": (int, "number"),
|
466
|
"Receive Frequency": (chirp_common.parse_freq, "freq"),
|
467
|
"Offset Frequency": (chirp_common.parse_freq, "offset"),
|
468
|
"Offset Direction": (lambda v:
|
469
|
RTCSVRadio.DUPLEX_MAP.get(v, v), "duplex"),
|
470
|
"Operating Mode": (str, "mode"),
|
471
|
"Name": (str, "name"),
|
472
|
"Tone Mode": (lambda v:
|
473
|
RTCSVRadio.TMODE_MAP.get(v, v), "tmode"),
|
474
|
"CTCSS": (lambda v:
|
475
|
float(v.split(" ")[0]), "rtone"),
|
476
|
"DCS": (int, "dtcs"),
|
477
|
"Skip": (lambda v:
|
478
|
RTCSVRadio.SKIP_MAP.get(v, v), "skip"),
|
479
|
"Step": (lambda v:
|
480
|
float(v.split(" ")[0]), "tuning_step"),
|
481
|
"Mask": (lambda v:
|
482
|
RTCSVRadio.BOOL_MAP.get(v, v), "empty",),
|
483
|
"Comment": (str, "comment"),
|
484
|
}
|
485
|
|
486
|
def _clean_duplex(self, headers, line, mem):
|
487
|
if mem.duplex == "split":
|
488
|
try:
|
489
|
val = get_datum_by_header(headers, line, "Transmit Frequency")
|
490
|
val = chirp_common.parse_freq(val)
|
491
|
mem.offset = val
|
492
|
except OmittedHeaderError:
|
493
|
pass
|
494
|
|
495
|
return mem
|
496
|
|
497
|
def _clean_mode(self, headers, line, mem):
|
498
|
if mem.mode == "FM":
|
499
|
try:
|
500
|
val = get_datum_by_header(headers, line, "Half Dev")
|
501
|
if self.BOOL_MAP[val]:
|
502
|
mem.mode = "FMN"
|
503
|
except OmittedHeaderError:
|
504
|
pass
|
505
|
|
506
|
return mem
|
507
|
|
508
|
def _clean_ctone(self, headers, line, mem):
|
509
|
# RT Systems only stores a single tone value
|
510
|
mem.ctone = mem.rtone
|
511
|
return mem
|
512
|
|
513
|
@classmethod
|
514
|
def match_model(cls, filedata, filename):
|
515
|
"""Match files ending in .csv and using RT Systems column names."""
|
516
|
# RT Systems provides a different set of columns for each radio.
|
517
|
# We attempt to match only the first few columns, hoping they are
|
518
|
# consistent across radio models.
|
519
|
try:
|
520
|
filedata = filedata.decode()
|
521
|
except UnicodeDecodeError:
|
522
|
# CSV files are text
|
523
|
return False
|
524
|
return filename.lower().endswith("." + cls.FILE_EXTENSION) and \
|
525
|
filedata.startswith("Channel Number,Receive Frequency,"
|
526
|
"Transmit Frequency,Offset Frequency,"
|
527
|
"Offset Direction,Operating Mode,"
|
528
|
"Name,Tone Mode,CTCSS,DCS")
|