Project

General

Profile

Bug #11303 » generic_csv.py

fea88606 - Dan Smith, 04/14/2024 05:46 PM

 
1
# Copyright 2008 Dan Smith <dsmith@danplanet.com>
2
#
3
# This program is free software: you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation, either version 3 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
15

    
16
import os
17
import csv
18
import logging
19

    
20
from chirp import chirp_common, errors, directory
21

    
22
LOG = logging.getLogger(__name__)
23
DEFAULT_POWER_LEVEL = chirp_common.AutoNamedPowerLevel(50)
24

    
25

    
26
class OmittedHeaderError(Exception):
27
    """Internal exception to signal that a column has been omitted"""
28
    pass
29

    
30

    
31
def get_datum_by_header(headers, data, header):
32
    """Return the column corresponding to @headers[@header] from @data"""
33
    if header not in headers:
34
        raise OmittedHeaderError("Header %s not provided" % header)
35

    
36
    try:
37
        return data[headers.index(header)]
38
    except IndexError:
39
        raise OmittedHeaderError("Header %s not provided on this line" %
40
                                 header)
41

    
42

    
43
def write_memory(writer, mem):
44
    """Write @mem using @writer if not empty"""
45
    if mem.empty:
46
        return
47
    writer.writerow(mem.to_csv())
48

    
49

    
50
def parse_cross_mode(value):
51
    if value not in chirp_common.CROSS_MODES:
52
        raise ValueError('Invalid cross mode %r' % value)
53
    return value
54

    
55

    
56
@directory.register
57
class CSVRadio(chirp_common.FileBackedRadio):
58
    """A driver for Generic CSV files"""
59
    VENDOR = "Generic"
60
    MODEL = "CSV"
61
    FILE_EXTENSION = "csv"
62
    FORMATS = [directory.register_format('CSV', '*.csv')]
63

    
64
    ATTR_MAP = {
65
        "Location":      (int,   "number"),
66
        "Name":          (str,   "name"),
67
        "Frequency":     (chirp_common.parse_freq, "freq"),
68
        "Duplex":        (str,   "duplex"),
69
        "Offset":        (chirp_common.parse_freq, "offset"),
70
        "Tone":          (str,   "tmode"),
71
        "rToneFreq":     (float, "rtone"),
72
        "cToneFreq":     (float, "ctone"),
73
        "DtcsCode":      (int,   "dtcs"),
74
        "DtcsPolarity":  (str,   "dtcs_polarity"),
75
        "RxDtcsCode":    (int,   "rx_dtcs"),
76
        "CrossMode":     (parse_cross_mode, "cross_mode"),
77
        "Mode":          (str,   "mode"),
78
        "TStep":         (float, "tuning_step"),
79
        "Skip":          (str,   "skip"),
80
        "Power":         (chirp_common.parse_power, "power"),
81
        "Comment":       (str,   "comment"),
82
        }
83

    
84
    def _blank(self, setDefault=False):
85
        self.errors = []
86
        self.memories = [chirp_common.Memory(i, True) for i in range(0, 1000)]
87
        if (setDefault):
88
            self.memories[0].empty = False
89
            self.memories[0].freq = 146010000
90
            # Default to 50W
91
            self.memories[0].power = DEFAULT_POWER_LEVEL
92

    
93
    def __init__(self, pipe):
94
        chirp_common.FileBackedRadio.__init__(self, None)
95
        self.memories = []
96
        self.file_has_rTone = None  # Set in load(), used in _clean_tmode()
97
        self.file_has_cTone = None
98

    
99
        # Persistence for comment lines
100
        # List of tuples of (previous_memory, comment)
101
        self._comments = []
102

    
103
        self._filename = pipe
104
        if self._filename and os.path.exists(self._filename):
105
            self.load()
106
        else:
107
            self._blank(True)
108

    
109
    def get_features(self):
110
        rf = chirp_common.RadioFeatures()
111
        rf.has_bank = False
112
        rf.requires_call_lists = False
113
        rf.has_implicit_calls = False
114
        rf.memory_bounds = (0, len(self.memories)-1)
115
        rf.has_infinite_number = True
116
        rf.has_nostep_tuning = True
117
        rf.has_comment = True
118
        rf.has_rx_dtcs = True
119
        rf.has_variable_power = True
120
        rf.can_odd_split = True
121

    
122
        rf.valid_modes = list(chirp_common.MODES)
123
        rf.valid_tmodes = list(chirp_common.TONE_MODES)
124
        rf.valid_cross_modes = list(chirp_common.CROSS_MODES)
125
        rf.valid_duplexes = ["", "-", "+", "split", "off"]
126
        rf.valid_tuning_steps = list(chirp_common.TUNING_STEPS)
127
        rf.valid_bands = [(1, 10000000000)]
128
        rf.valid_skips = ["", "S"]
129
        rf.valid_characters = chirp_common.CHARSET_1252
130
        rf.valid_name_length = 999
131
        rf.valid_power_levels = [chirp_common.AutoNamedPowerLevel(0.1),
132
                                 DEFAULT_POWER_LEVEL,
133
                                 chirp_common.AutoNamedPowerLevel(1500)]
134

    
135
        return rf
136

    
137
    def _clean(self, headers, line, mem):
138
        """Runs post-processing functions on new mem objects.
139

    
140
        This is useful for parsing other CSV dialects when multiple columns
141
        convert to a single Chirp column."""
142

    
143
        for attr in dir(mem):
144
            fname = "_clean_%s" % attr
145
            if hasattr(self, fname):
146
                mem = getattr(self, fname)(headers, line, mem)
147

    
148
        return mem
149

    
150
    def _clean_tmode(self, headers, line, mem):
151
        """ If there is exactly one of [rToneFreq, cToneFreq] columns in the
152
        csv file, use it for both rtone & ctone. Makes TSQL use friendlier."""
153

    
154
        if self.file_has_rTone and not self.file_has_cTone:
155
            mem.ctone = mem.rtone
156
        elif self.file_has_cTone and not self.file_has_rTone:
157
            mem.rtone = mem.ctone
158

    
159
        return mem
160

    
161
    def _parse_csv_data_line(self, headers, line):
162
        mem = chirp_common.Memory()
163
        try:
164
            if get_datum_by_header(headers, line, "Mode") == "DV":
165
                mem = chirp_common.DVMemory()
166
        except OmittedHeaderError:
167
            pass
168

    
169
        for header in headers:
170
            try:
171
                typ, attr = self.ATTR_MAP[header]
172
            except KeyError:
173
                continue
174
            try:
175
                val = get_datum_by_header(headers, line, header)
176
                if not val and typ == int:
177
                    val = None
178
                else:
179
                    val = typ(val)
180
                if hasattr(mem, attr):
181
                    setattr(mem, attr, val)
182
            except OmittedHeaderError:
183
                pass
184
            except Exception as e:
185
                raise Exception("[%s] %s" % (attr, e))
186

    
187
        if not mem.power:
188
            # Default power level to something if not set
189
            mem.power = DEFAULT_POWER_LEVEL
190

    
191
        return self._clean(headers, line, mem)
192

    
193
    def load(self, filename=None):
194
        if filename is None and self._filename is None:
195
            raise errors.RadioError("Need a location to load from")
196

    
197
        if filename:
198
            self._filename = filename
199

    
200
        self._blank()
201

    
202
        with open(self._filename, newline='', encoding='utf-8-sig') as f:
203
            return self._load(f)
204

    
205
    def _load(self, f):
206
        reader = csv.reader(f, delimiter=chirp_common.SEPCHAR, quotechar='"')
207

    
208
        self._comments = []
209
        good = 0
210
        lineno = 0
211
        last_number = -1
212
        for line in reader:
213
            # Skip (but stash) comment lines that start with #
214
            if line and line[0].startswith('#'):
215
                self._comments.append((last_number, ' '.join(line)))
216
                continue
217
            lineno += 1
218
            if lineno == 1:
219
                header = line
220
                self.file_has_rTone = "rToneFreq" in header
221
                self.file_has_cTone = "cToneFreq" in header
222
                continue
223

    
224
            if len(header) > len(line):
225
                LOG.error("Line %i has %i columns, expected %i",
226
                          lineno, len(line), len(header))
227
                self.errors.append("Column number mismatch on line %i" %
228
                                   lineno)
229
                continue
230

    
231
            try:
232
                mem = self._parse_csv_data_line(header, line)
233
                if mem.number is None:
234
                    raise Exception("Invalid Location field" % lineno)
235
            except Exception as e:
236
                LOG.error("Line %i: %s", lineno, e)
237
                self.errors.append("Line %i: %s" % (lineno, e))
238
                continue
239

    
240
            last_number = mem.number
241
            self._grow(mem.number)
242
            self.memories[mem.number] = mem
243
            good += 1
244

    
245
        if not good:
246
            raise errors.InvalidDataError("No channels found")
247

    
248
    def save(self, filename=None):
249
        if filename is None and self._filename is None:
250
            raise errors.RadioError("Need a location to save to")
251

    
252
        if filename:
253
            self._filename = filename
254

    
255
        with open(self._filename, "w", newline='', encoding='utf-8') as f:
256
            comments = list(self._comments)
257
            writer = csv.writer(f, delimiter=chirp_common.SEPCHAR)
258

    
259
            for index, comment in comments[:]:
260
                if index >= 0:
261
                    break
262
                writer.writerow([comment])
263
                comments.pop(0)
264

    
265
            writer.writerow(chirp_common.Memory.CSV_FORMAT)
266

    
267
            for mem in self.memories:
268
                for index, comment in comments[:]:
269
                    if index >= mem.number:
270
                        break
271
                    writer.writerow([comment])
272
                    comments.pop(0)
273
                write_memory(writer, mem)
274

    
275
    # MMAP compatibility
276
    def save_mmap(self, filename):
277
        return self.save(filename)
278

    
279
    def load_mmap(self, filename):
280
        return self.load(filename)
281

    
282
    def get_memories(self, lo=0, hi=999):
283
        return [x for x in self.memories if x.number >= lo and x.number <= hi]
284

    
285
    def get_memory(self, number):
286
        try:
287
            return self.memories[number].dupe()
288
        except:
289
            raise errors.InvalidMemoryLocation("No such memory %s" % number)
290

    
291
    def _grow(self, target):
292
        delta = target - len(self.memories)
293
        if delta < 0:
294
            return
295

    
296
        delta += 1
297

    
298
        for i in range(len(self.memories), len(self.memories) + delta + 1):
299
            mem = chirp_common.Memory()
300
            mem.empty = True
301
            mem.number = i
302
            self.memories.append(mem)
303

    
304
    def set_memory(self, newmem):
305
        newmem = newmem.dupe()
306
        if newmem.power is None:
307
            newmem.power = DEFAULT_POWER_LEVEL
308
        else:
309
            # Accept any power level because we are CSV, but convert it to
310
            # the class that will str() into our desired format.
311
            newmem.power = chirp_common.AutoNamedPowerLevel(
312
                chirp_common.dBm_to_watts(float(newmem.power)))
313
        self._grow(newmem.number)
314
        self.memories[newmem.number] = newmem
315
        self.memories[newmem.number].name = newmem.name.rstrip()
316

    
317
    def erase_memory(self, number):
318
        mem = chirp_common.Memory()
319
        mem.number = number
320
        mem.empty = True
321
        self.memories[number] = mem
322

    
323
    def get_raw_memory(self, number):
324
        return ",".join(chirp_common.Memory.CSV_FORMAT) + \
325
            os.linesep + \
326
            ",".join(self.memories[number].to_csv())
327

    
328
    @classmethod
329
    def match_model(cls, filedata, filename):
330
        """Match files ending in .CSV"""
331
        try:
332
            filedata = filedata.decode()
333
        except UnicodeDecodeError:
334
            # CSV files are text
335
            return False
336
        return filename.lower().endswith("." + cls.FILE_EXTENSION) and \
337
            (find_csv_header(filedata) or filedata == "")
338

    
339

    
340
def find_csv_header(filedata):
341
    if filedata.startswith('\ufeff') or filedata.startswith('\ufffe'):
342
        # Skip BOM
343
        filedata = filedata[1:]
344
    while filedata.startswith('#'):
345
        filedata = filedata[filedata.find('\n') + 1:]
346
    return filedata.startswith('Location,')
347

    
348

    
349
@directory.register
350
class CommanderCSVRadio(CSVRadio):
351
    """A driver for reading CSV files generated by KG-UV Commander software"""
352
    VENDOR = "Commander"
353
    MODEL = "KG-UV"
354
    FILE_EXTENSION = "csv"
355

    
356
    MODE_MAP = {
357
        "NARR": "NFM",
358
        "WIDE": "FM",
359
    }
360

    
361
    SCAN_MAP = {
362
        "ON":  "",
363
        "OFF": "S"
364
    }
365

    
366
    ATTR_MAP = {
367
        "#":            (int,   "number"),
368
        "Name":         (str,   "name"),
369
        "RX Freq":      (chirp_common.parse_freq, "freq"),
370
        "Scan":         (lambda v: CommanderCSVRadio.SCAN_MAP.get(v), "skip"),
371
        "TX Dev":       (lambda v: CommanderCSVRadio.MODE_MAP.get(v), "mode"),
372
        "Group/Notes":  (str,   "comment"),
373
    }
374

    
375
    def _clean_number(self, headers, line, mem):
376
        if mem.number == 0:
377
            for memory in self.memories:
378
                if memory.empty:
379
                    mem.number = memory.number
380
                    break
381
        return mem
382

    
383
    def _clean_duplex(self, headers, line, mem):
384
        try:
385
            txfreq = chirp_common.parse_freq(
386
                get_datum_by_header(headers, line, "TX Freq"))
387
        except ValueError:
388
            mem.duplex = "off"
389
            return mem
390

    
391
        if mem.freq == txfreq:
392
            mem.duplex = ""
393
        elif txfreq:
394
            mem.duplex = "split"
395
            mem.offset = txfreq
396

    
397
        return mem
398

    
399
    def _clean_tmode(self, headers, line, mem):
400
        rtone = get_datum_by_header(headers, line, "Encode")
401
        ctone = get_datum_by_header(headers, line, "Decode")
402
        if rtone == "OFF":
403
            rtone = None
404
        else:
405
            rtone = float(rtone)
406

    
407
        if ctone == "OFF":
408
            ctone = None
409
        else:
410
            ctone = float(ctone)
411

    
412
        if rtone:
413
            mem.tmode = "Tone"
414
        if ctone:
415
            mem.tmode = "TSQL"
416

    
417
        mem.rtone = rtone or 88.5
418
        mem.ctone = ctone or mem.rtone
419

    
420
        return mem
421

    
422
    @classmethod
423
    def match_model(cls, filedata, filename):
424
        """Match files ending in .csv and using Commander column names."""
425
        return filename.lower().endswith("." + cls.FILE_EXTENSION) and \
426
            filedata.startswith(b"Name,RX Freq,TX Freq,Decode,Encode,TX Pwr,"
427
                                b"Scan,TX Dev,Busy Lck,Group/Notes") or \
428
            filedata.startswith(b'"#","Name","RX Freq","TX Freq","Decode",'
429
                                b'"Encode","TX Pwr","Scan","TX Dev",'
430
                                b'"Busy Lck","Group/Notes"')
431

    
432

    
433
@directory.register
434
class RTCSVRadio(CSVRadio):
435
    """A driver for reading CSV files generated by RT Systems software"""
436
    VENDOR = "RT Systems"
437
    MODEL = "CSV"
438
    FILE_EXTENSION = "csv"
439

    
440
    DUPLEX_MAP = {
441
        "Minus":    "-",
442
        "Plus":     "+",
443
        "Simplex":  "",
444
        "Split":    "split",
445
    }
446

    
447
    SKIP_MAP = {
448
        "Off":    "",
449
        "On":     "S",
450
        "P Scan": "P",
451
        "Skip":   "S",
452
        }
453

    
454
    TMODE_MAP = {
455
        "None":     "",
456
        "T Sql":    "TSQL",
457
    }
458

    
459
    BOOL_MAP = {
460
        "Off":  False,
461
        "On":   True,
462
    }
463

    
464
    ATTR_MAP = {
465
        "Channel Number":    (int,   "number"),
466
        "Receive Frequency": (chirp_common.parse_freq, "freq"),
467
        "Offset Frequency":  (chirp_common.parse_freq, "offset"),
468
        "Offset Direction":  (lambda v:
469
                              RTCSVRadio.DUPLEX_MAP.get(v, v), "duplex"),
470
        "Operating Mode":    (str,   "mode"),
471
        "Name":              (str,   "name"),
472
        "Tone Mode":         (lambda v:
473
                              RTCSVRadio.TMODE_MAP.get(v, v), "tmode"),
474
        "CTCSS":             (lambda v:
475
                              float(v.split(" ")[0]), "rtone"),
476
        "DCS":               (int,   "dtcs"),
477
        "Skip":              (lambda v:
478
                              RTCSVRadio.SKIP_MAP.get(v, v), "skip"),
479
        "Step":              (lambda v:
480
                              float(v.split(" ")[0]), "tuning_step"),
481
        "Mask":              (lambda v:
482
                              RTCSVRadio.BOOL_MAP.get(v, v), "empty",),
483
        "Comment":           (str,   "comment"),
484
        }
485

    
486
    def _clean_duplex(self, headers, line, mem):
487
        if mem.duplex == "split":
488
            try:
489
                val = get_datum_by_header(headers, line, "Transmit Frequency")
490
                val = chirp_common.parse_freq(val)
491
                mem.offset = val
492
            except OmittedHeaderError:
493
                pass
494

    
495
        return mem
496

    
497
    def _clean_mode(self, headers, line, mem):
498
        if mem.mode == "FM":
499
            try:
500
                val = get_datum_by_header(headers, line, "Half Dev")
501
                if self.BOOL_MAP[val]:
502
                    mem.mode = "FMN"
503
            except OmittedHeaderError:
504
                pass
505

    
506
        return mem
507

    
508
    def _clean_ctone(self, headers, line, mem):
509
        # RT Systems only stores a single tone value
510
        mem.ctone = mem.rtone
511
        return mem
512

    
513
    @classmethod
514
    def match_model(cls, filedata, filename):
515
        """Match files ending in .csv and using RT Systems column names."""
516
        # RT Systems provides a different set of columns for each radio.
517
        # We attempt to match only the first few columns, hoping they are
518
        # consistent across radio models.
519
        try:
520
            filedata = filedata.decode()
521
        except UnicodeDecodeError:
522
            # CSV files are text
523
            return False
524
        return filename.lower().endswith("." + cls.FILE_EXTENSION) and \
525
            filedata.startswith("Channel Number,Receive Frequency,"
526
                                "Transmit Frequency,Offset Frequency,"
527
                                "Offset Direction,Operating Mode,"
528
                                "Name,Tone Mode,CTCSS,DCS")
(3-3/3)