-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathVCFParser.py
1271 lines (1029 loc) · 54 KB
/
VCFParser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import re # import per l'utilizzo delle regex
import sys # import per chiudere il programma
from urllib.parse import urlparse # import per controllare l'url
from pathlib import Path # import per prendere il path del file nell'url
from dateutil.parser import parse # per controllare se una stringa è una data
import logging, sys # per visualizzare le stampe di debug
import gzip # per leggere i file gzip
"""
LINE 330/331 E 371/372 se non si vuole che gli "id" di info e format di default
vengano sovrascritti se cambia il tipo vanno decommentate, non è ben specificato
nel file se sia possibile o meno..
"""
class VCFParser:
# parametri da inizializzare
filename = ""
zipped = True
# Definisco i pattern per le regex comuni come privati
__general_match = "[a-zA-Z0-9_.]+"
__id_pattern = "ID=(?P<id>" + __general_match + ")"
__desc_pattern = "Description=\"[^\"]+\""
__num_patter = "Number=(?P<num>([0-9]+|A|R|G|.))"
# mi salvo le righe in cui trovo fileformat (deve essere la prima),
# fileDate, reference, pedigree/pedigreeDB
__lineFileFormat = None
__lineFileDate = None
__lineReference = None
__linePedigree = None
__actualLine = 0 # riga in cui sono arrivato a leggere (per print error)
# conto le righe vuote per sistemare il conteggio per le print error
__emptyLine = 0
# ultimo valore del campo pos trovato cosi controllo che siano in ordine crescente
__lastPosValue = -1
# valori ID dei metadati FILTER trovati
__lstIdFilterValue = []
# valori ID dei metadati CONTIG trovati
__lstIdContigValue = []
# valori ID dei metadati ALT trovati
__lstIdAltValue = []
# valori del campo chrom trovati, mi serve perchè le pos devono essere
# in ordine crescente SOLO per lo stesso chrom e non in assoluto
# inoltre possono essere usati nel campo alt
__lstChromValue = []
# valori pos del campo dati
__lstPosValue = []
# valori accettati per lo starti id del campo alt (metaline)
__lstAltValueAccepted = ["DEL", "INS", "DUP", "INV", "CN", "DUP:TANDEM", "DEL:ME", "INS:ME"]
# lunghezza della header line
__lenHeaderDataLine = -1
# mi dice se è possibile inserire il campo format e dei sample nei dati
__FormatDataAllowed = False
# variabili che mi dicono se è definito il campo format e ulteriori sample ids
__formatDefine = False
__otherSampleDefine = False
# valori del campo info (metaline)
__dictInfo = {
'AA': 'String', 'AC': 'Integer', 'AF': 'Float', 'AN': 'Integer',
'BQ': 'Float', 'CIGAR': 'String', 'DB': 'Flag', 'DP': 'Integer',
'END': 'Integer', 'H2': 'Flag', 'H3': 'Flag', 'MQ': 'Float',
'MQ0': 'Integer', 'NS': 'Integer', 'SB': 'String', 'SOMATIC': 'Flag',
'VALIDATED': 'Flag', '1000G': 'Flag',
# structural variants
'IMPRECISE': 'Flag', 'NOVEL': 'Flag', 'SVTYPE': 'String',
'SVLEN': 'Integer', 'CIPOS': 'Integer', 'CIEND': 'Integer',
'HOMLEN': 'Integer', 'HOMSEQ': 'String', 'BKPTID': 'String',
'MEINFO': 'String', 'METRANS': 'String', 'DGVID': 'String',
'DBVARID': 'String', 'DBRIPID': 'String', 'MATEID': 'String',
'PARID': 'String', 'EVENT': 'String', 'CILEN': 'Integer',
'DPADJ': 'Integer', 'CN': 'Integer', 'CNADJ': 'Integer',
'CICN': 'Integer', 'CICNADJ': 'Integer'
}
# valori del campo format (metaline)
# PL, HQ, PS devono contenere almeno 2 valori
__dictFormat = {
'GT' : 'String', 'DP' : 'Integer', 'FT' : 'String',
'GL' : 'Float', 'GLE' : 'String', 'PL' : 'Integer',
'GP' : 'Float', 'EC' : 'Integer', 'GQ' : 'Integer',
'HQ' : 'Integer', 'PS' : 'Integer', 'PQ' : 'Integer',
'MQ' : 'Integer'
}
# mi dice se nelle format line è definito il campo gt
__gtPresent = False
# se nella riga dati, campo alt, ci sono dei breakends formati da
# chrom:pos li aggiungo e li controllo a fine file perchè devo prima scorrere tutti
# i chrom e le pos
# nella lista salvo [(stringa, line), (stringa2, line2)...]
__lstBreakendsValue = []
"""
inizializzo la classe
"""
def __init__(self, filename):
self.filename = filename
self.zipped = filename.endswith("gz")
# inizializzo le stampe di debug
# level = INFO -> no print, level= DEBUG -> print
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
"""
funzione che legge tutto il contenuto di un file e richiama il parser
filename = nome del file
zipped = specifica se il file è in formato compresso oppure no
"""
def parseFile(self):
# apro e leggo tutto il contenuto del file
if(self.zipped == False):
with open(self.filename, 'r') as vcf_file:
lines = vcf_file.readlines()
else:
lines = []
with gzip.open(self.filename, "rt") as vcf_file:
for line in vcf_file:
lines.append(line)
vcf_file.close() # chiudo il file
self.__parser(lines) # richiamo il parser
"""
parso tutto il file riga per riga! se incontro un errore il programma
stampa il messaggio e si blocca
"""
def __parser(self, allLines):
metadataAllows = True # variabile booleana che mi dice se sono ancora nella sezione ## oppure sono nella sezione dati
for line in allLines:
if(line.strip() == ""):
self.__emptyLine = self.__emptyLine +1
# se sono nella sezione metadati controllo che la stringa inizi con ##
elif(metadataAllows):
# guardo che tipo di riga sto leggendo metadati o no
isMetadata = line.startswith('##')
# Se ho trovato una stringa di metadati
# tolgo i ## iniziali e passo la stringa al parser ad hoc per i metadati
if (isMetadata == True):
self.__parseMetadata(line[2:])
self.__actualLine = self.__actualLine + 1 + self.__emptyLine
self.__emptyLine = 0 # risetto le righe vuote perchè gia sommate
# altrimenti controllo che sia l'header della sezione dati
else:
metadataAllows = False
line = self.__white_cleaner(line)
# richiamo la funzione
# se è tutto a posto proseguo altrimenti la funzione
# bloccherà il programma
self.__myHeaderMetadata(line)
self.__actualLine = self.__actualLine + 1 + self.__emptyLine
self.__emptyLine = 0 # risetto le righe vuote perchè gia sommate
else:
# ripulisco la linea da tab, spazi etc..
clearLine = self.__white_cleaner(line)
lst = clearLine.split(" ")
self.__parseData(lst)
self.__actualLine = self.__actualLine + 1 + self.__emptyLine
self.__emptyLine = 0 # risetto le righe vuote perchè gia sommate
# una volta lette tutte le righe controllo gli eventuali valori breakends di alt
self.__matchBeetwenParenthesis()
print("GREAT! VCF IS WELL FORMED!!!")
"""
parser per le righe di metadati
"""
def __parseMetadata(self, line):
"""
1) cerco il primo = dato che ogni stringa è una coppia key-value
2) faccio uno "switch" per capire in quale metadato mi trovo
"""
indexEqual = line.find("=")
# se non trovo = la stringa è mal formata
if(indexEqual == -1):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", invalid metadata line. Missing = character!")
info = line[:indexEqual]
# metto la riga in maiuscolo, cosi matcho anche con le righe in minuscolo
info = info.upper()
if (info == "FILEFORMAT"):
self.__myFileformat(line)
elif (info == "FILEDATE"):
self.__myFiledate(line)
elif (info == "FILTER"):
self.__myFilter(line)
elif (info == "INFO"):
self.__myInfo(line)
elif (info == "FORMAT"):
self.__myFormat(line)
elif (info == "ALT"):
self.__myAlt(line)
elif (info == "ASSEMBLY"):
self.__myAssembly(line)
elif (info == "CONTIG"):
self.__myContig(line)
elif (info == "SAMPLE"):
self.__mySample(line)
elif (info == "PEDIGREEDB"):
self.__myPedigreedb(line)
elif (info == "PEDIGREE"):
self.__myPedigree(line)
elif (info == "REFERENCE"):
self.__myReference(line)
elif (info == "SOURCE"):
self.__mySource(line)
elif (info == "PHASING"):
self.__myPhasing(line)
else:
print("Warning: unknown metadata line starting with: " + info + ", at line: "
+ str(self.__actualLine + 1))
"""
funzione che controlla che la prima riga sia del tipo
##fileformat=VCFvNumber.Number
"""
def __myFileformat(self, line):
# debug printing
logging.debug(" __myFileformat() parsing line: " + line)
# controllo che sia la prima volta che incontro la riga fileFormat
if(self.__lineFileFormat is not None):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", fileformat already define!")
elif(self.__actualLine != 0):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", fileformat must be defined at line 0")
else:
# controllo che sia well-formed tramite regex
response = re.match("fileformat=VCFv[0-9].[0-9]$", line, re.IGNORECASE)
# se well-formed aggiorno __lineFileFormat altrimenti stampo errore ed esco
if (response):
self.__lineFileFormat = self.__actualLine
else:
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", fileformat malformed!")
"""
controllo la riga FILEDATI
##filedate=date
"""
def __myFiledate(self, line):
# debug printing
logging.debug(" __myFiledate() parsing line: " + line)
if(self.__lineFileDate is not None):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", filedate already define!")
# prendo tutto quello che c'è dopo = e controllo sia una data
indexEqual = line.find("=")
date = line[indexEqual+1:]
result = self.__is_date(date)
if(result):
self.__lineFileDate = self.__actualLine
"""
controllo la riga FILTER
##filter=<ID=ID,Description="description">
"""
def __myFilter(self,line):
# debug printing
logging.debug(" __myFilter() parsing line: " + line)
response = re.match("FILTER=<" + self.__id_pattern + ",\s*" + self.__desc_pattern + ">$" ,line, re.IGNORECASE)
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", FILTER malformed!")
# se è well formed mi salvo l'id in quanto devo confrontarlo con quello dei campi data
else:
self.__lstIdFilterValue.append(response.group("id").upper())
"""
controllo la riga info
##info=<ID=ID,Number=number,Type=type,Description="description",Source="source",Version="version">
"""
def __myInfo(self,line):
# debug printing
logging.debug(" __myInfo() parsing line: " + line)
response = re.match("INFO=<" + self.__id_pattern + ",\s*" + self.__num_patter + ",\s*" +
"Type=(?P<type>(Integer|Float|Flag|Character|String)),\s*" +
self.__desc_pattern + "(,\s*Source=(\")*[a-zA-Z0-9_\s]+(\")*)*" +
"(,\s*Version=(\")*[0-9]+(\")*)*>$" ,line, re.IGNORECASE)
# se well-formed stampo eventuale warning flag-num
if (response):
self.__updateInfoDictionary(response.group("id"), response.group("type"), response.group("num"))
# controllo che se type = flag allora number = 0.. E' UNO SHOULD BE, QUINDI NON OBBLIGATORIO
if (response.group("type") == "Flag" and response.group("num") != "0"):
print("Warning: number should be zero with Flag type at line: " + str(self.__actualLine + 1))
else:
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", INFO malformed!")
"""
controlla se il campo id è nel dizionario, se c'è già controlla che
anche il type sia definito allo stesso modo!
Accetto info line multiple definite dall'utente
NEL DIZIONARIO SALVO ID = [TIPO, NUMERO]
in quanto poi nelle righe data mi serve sapere quanti valori possono essere
aggiunti per ciascun id!
"""
def __updateInfoDictionary(self,id,Type, num):
# controllo se è già definito e se il tipo è lo stesso di quello già presente
if (id in self.__dictInfo):
#if (Type != self.__dictInfo[id] and Type != self.__dictInfo[id][0]):
#sys.exit("Error at line: " + str(self.__actualLine + 1) + ", " + id + " id already define. Check type!")
self.__dictInfo[id] = [Type, num]
else: # se non è definito lo aggiungo come tipo
self.__dictInfo[id] = [Type, num]
"""
controllo la riga format
##info=<ID=ID,Number=number,Type=type,Description="description">
"""
def __myFormat(self,line):
# debug printing
logging.debug(" __myFormat() parsing line: " + line)
response = re.match("FORMAT=<" + self.__id_pattern + ",\s*Number=(?P<num>([0-9]+|.)),\s*" +
"Type=(?P<type>(Integer|Float|Character|String)),\s*" +
self.__desc_pattern + ">$", line, re.IGNORECASE)
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", FORMAT malformed!")
self.__FormatDataAllowed = True
self.__updateFormatDictionary(response.group("id"), response.group("type"), response.group("num"))
"""
controlla se il campo id è nel dizionario, se c'è già controlla che
anche il type sia definito allo stesso modo!
Accetto format line multiple definite dall'utente
NEL DIZIONARIO SALVO ID = [TIPO, NUMERO]
in quanto poi nelle righe data mi serve sapere quanti valori possono essere
aggiunti per ciascun id!
"""
def __updateFormatDictionary(self,id,Type, num):
# controllo se è già definito e se il tipo è lo stesso di quello già presente
# dato che puo averlo già definito per accettarlo doppio controllo anche
# se è uguale a quello in lista
if (id in self.__dictFormat):
#if (Type != self.__dictFormat[id] and Type != self.__dictFormat[id][0]):
# sys.exit("Error at line: " + str(self.__actualLine + 1) + ", " + id + " id already define. Check type!")
self.__specialCheck(id,num)
self.__dictFormat[id] = [Type, num]
else: # se non è definito lo aggiungo come tipo
self.__dictFormat[id] = [Type, num]
"""
Controlli particolari sul campo id delle righe format
GT = salvo se l'ho trovato, se c'è dovrà essere il primo in ogni format
PL, EC = num > 1
HQ = num deve essere 2
"""
def __specialCheck(self, id, num):
if(id == "GT"):
self.__gtPresent = True
elif(id == "HQ" and num!="2"):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", "
+ id + " number must be 2!")
elif ((id == "PL" or id == "EC") and num =="1"):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", "
+ id + " number must be >1!")
"""
controllo la riga alt
##info=<ID=ID;ID,Description="description">
"""
def __myAlt(self,line):
# debug printing
logging.debug(" __myAlt() parsing line: " + line)
# aggiungo una seconda parte al patter ID per accettare una colon-separated list
response = re.match("ALT=<ID=(?P<id>" + self.__general_match + "(:" + self.__general_match + ")*)"+ ",\s*" + self.__desc_pattern + ">$", line, re.IGNORECASE)
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", ALT malformed!")
"""
la regex ha dato positive match, controllo che il campo id sia valido
deve iniziare con qualcosa presente nella lista __lstAltValueAccepted
"""
startOk = False
index = 0
idString = response.group("id")
while startOk == False and index < len(self.__lstAltValueAccepted):
stringStart = self.__lstAltValueAccepted[index]
# controllo se l'inizio matcha con una delle possibilità
if(idString.startswith(stringStart)):
startOk = True
index = index + 1 # passo alla stringa successiva
# se non inizia con uno dei caratteri consentiti mi fermo
if (startOk == False):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", ALT ID malformed!")
# se l'id è valido ed è composto da CNV stampo un warning
if (idString.startswith("CN")):
print("Warning! At line: " + str(self.__actualLine + 1) +
" CNV category should not be used when a more specific category can be applied")
self.__lstIdAltValue.append("<" + response.group("id") +">")
"""
controllo la riga assembly
##assembly=url to fasta file
"""
def __myAssembly(self,line):
# debug printing
logging.debug(" __myAssembly() parsing line: " + line)
# sono sicuro che inizia con assembly= perchè lo matcho nella chiamata di funzione
# quindi prendo tutto quello che c'è dopo l'= e controllo sia una url
indexEqual = line.find("=")
url = line[indexEqual+1:] # +1 per escludere l'uguale
# uso il metodo urlparse della libreria urllib.parse
getUrlResponse = urlparse(url)
# prendo il nome del file, voglio che controllare che sia un fasta
urlFile = Path(getUrlResponse.path).name
# accetto qualsiasi cosa che termini con .fasta, sono sicuro che
# va bene perchè ho già controllato che sia un url
response = re.match("(.*).(fasta|fa)$", urlFile, re.IGNORECASE)
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", ASSEMBLY malformed!")
"""
controllo la riga contig
##contig=<ID=ctg1,lenght=number,....>
"""
def __myContig(self,line):
# debug printing
logging.debug(" __myContig() parsing line: " + line)
response = re.match("CONTIG=<" + self.__id_pattern + "(,(\s)*(.*))*>$", line, re.IGNORECASE)
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", CONTIG malformed!")
# aggiungo l'id alla lista, mi servirà per i campi data
self.__lstIdContigValue.append(response.group("id").upper())
"""
controllo la riga sample
##SAMPLE=<ID=S_ID,Genomes=G1_ID;G2_ID; ...;GK_ID,Mixture=N1;N2; ...;NK,Description=S1;S2; ...;SK>
"""
def __mySample(self,line):
# debug printing
logging.debug(" __mySample() parsing line: " + line)
response = re.match("SAMPLE=<" + self.__id_pattern + ",\s*"
+ "Genomes=" + self.__general_match + "(;" + self.__general_match + ")*" + ",\s*"
+ "Mixture=(.[0-9]|[0-9].)" + "(;(.[0-9]|[0-9].))*" + ",\s*"
+ self.__desc_pattern +">$"
, line, re.IGNORECASE)
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", SAMPLE malformed!")
"""
controllo la riga pedigreeDB
##pedigreeDB=<url>
"""
def __myPedigreedb(self,line):
# debug printing
logging.debug(" __myPedigreedb() parsing line: " + line)
if(self.__linePedigree is not None):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", pedigree already define!")
# controllo se il formato è valido
response_url = re.match("PEDIGREEDB=<(?P<url>(.*))>$", line, re.IGNORECASE)
#se il formato non corrisponde mi fermo
if(not response_url):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", PEDEGREEDB malformed!")
# se il formato corrisponde controllo che la url sia valida
getUrlResponse = response_url.group("url")
# se non è well-formed stampo errore
if (not getUrlResponse):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", PEDEGREEDB malformed!")
self.__linePedigree = self.__actualLine
"""
controllo la riga pedigree
##PEDIGREE=<Name_0=G0-ID,Name_1=G1-ID,...,Name_N=GN-ID>
"""
def __myPedigree(self,line):
# debug printing
logging.debug(" __myPedigree() parsing line: " + line)
if(self.__linePedigree is not None):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", pedigree already define!")
# controllo se il formato è valido
response = re.match("PEDIGREE=<" + self.__general_match + "=" + self.__general_match
+ "(,\s*" + self.__general_match + "=" + self.__general_match
+")*>$", line, re.IGNORECASE)
#se il formato non corrisponde mi fermo
if(not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", PEDIGREE malformed!")
else: # se corrisponde controllo che i name non siano ripetuti
# prendo la stringa compresa tra <>
indexMin = line.find("<")
indexMax = line.find(">")
text = line[indexMin+1:indexMax]
#splitto tutte le stringhe divise da , per ottenere i sottogruppi
couple = text.strip().split(',')
name = []
# di ogni sottogruppo elimino la parte dall'= alla fine
# e salvo la parte iniziale nella lista name
[name.append(single[:single.find("=")]) for single in couple]
#creo una lista passando da un set, cosi elimino possibili doppioni
listNoRepeat = list(set(name))
# se le due liste non sono lunghe ugali c'erano dei doppioni nei nomi --> errore
if(len(name) != len(listNoRepeat)):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", PEDIGREE malformed!")
self.__linePedigree = self.__actualLine
"""
Return whether the string can be interpreted as a date.
:param string: str, string to check for date
:param fuzzy: bool, ignore unknown tokens in string if True
"""
def __is_date(self, string, fuzzy=False):
try:
parse(string, fuzzy=fuzzy)
return True
except ValueError:
return False
"""
controllo la riga reference
##REFERENCE=...
"""
def __myReference(self,line):
# debug printing
logging.debug(" __myReference() parsing line: " + line)
if(self.__lineReference is not None):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", reference already define!")
#response = re.match("reference=file:///[\w|/|-]+.(fa|fasta)$", line, re.IGNORECASE)
response = re.match("reference=[\w:/._-]+$", line, re.IGNORECASE);
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", REFERENCE malformed!")
"""
controllo la riga source
##SOURCE=...
"""
def __mySource(self,line):
# debug printing
logging.debug(" __mySource() parsing line: " + line)
#response = re.match("reference=file:///[\w|/|-]+.(fa|fasta)$", line, re.IGNORECASE)
response = re.match("source=[\w:/._-]+$", line, re.IGNORECASE);
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", SOURCE malformed!")
"""
controllo la riga phasing
##PHASING=...
"""
def __myPhasing(self,line):
# debug printing
logging.debug(" __myPhasing() phasing line: " + line)
#response = re.match("reference=file:///[\w|/|-]+.(fa|fasta)$", line, re.IGNORECASE)
response = re.match("phasing=[\w:/._-]+$", line, re.IGNORECASE);
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", PHASING malformed!")
"""
controllo se la riga è l'intestazione delle righe dati
#CHROM POS ID REF ALT QUAL FILTER INFO ........
"""
def __myHeaderMetadata(self,line):
# debug printing
logging.debug(" __myHeaderMetadata() parsing line: " + line)
checkIfHeader = line.startswith('#CHROM')
if(checkIfHeader == False):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ",missing HEADER DATALINE!")
if(self.__FormatDataAllowed == True):
regexFormatIfAllowed = "((((\s)+FORMAT))((\s)+([a-zA-Z0-9_])+)*){0,1}"
else:
regexFormatIfAllowed = ""
response = re.match("#CHROM(\s)+POS(\s)+ID(\s)+REF(\s)+ALT(\s)+"
+"QUAL(\s)+FILTER(\s)+INFO"
+ regexFormatIfAllowed + "$"
, line, re.IGNORECASE)
# se non è well-formed stampo errore, faccio piu controlli per stampe piu chiare
if (not response and regexFormatIfAllowed == "" and "FORMAT" in line):
sys.exit("Error at line: " + str(self.__actualLine + 1)
+ ", HEADER DATALINE malformed! Too add FORMAT field you must add format metaline")
elif(not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", HEADER DATALINE malformed!")
# salvo la lunghezza dell'header per confrontarla con le righe data
self.__lenHeaderDataLine = len(line.split(" "))
if("FORMAT" in line):
self.__formatDefine = True
# cerco dove inizia la parola format, aggiungo 7 per saltarla e prendere
# tutte le parole dopo format
indexFormat = line.find("FORMAT") + 7
self.__checkNoDuplicateName(line[indexFormat:])
"""
funzione che data una stringa controlla che non ci siano parole duplicate
Ogni parola deve essere divisa da un solo spazio
return: no duplicate
sys.exit: one or more word duplicate
"""
def __checkNoDuplicateName(self,line):
# se la riga è vuota non c'è a
if(line != ""):
self.__otherSampleDefine = True
# splitto la stringa per ottenere una lista
listOfElems = line.split(" ")
# se le lunghezze sono diverse allora ci sono dei nomi duplicati
if len(listOfElems) != len(set(listOfElems)):
sys.exit("Error at line: " + str(self.__actualLine + 1)
+ ", duplicate sample IDs are not allowed in header dataline!")
"""
funzione che serve per sostituire tutti i doppi spazi/tab con un singolo
spazio
"""
def __white_cleaner(self,line):
line = line.strip()
line = line.replace('\t', ' ') # sostituisco i tab con gli spazi
# sostituisco i doppi spazi, ciclo per sistemare anche se ci sono 3 o piu spazi
while ' ' in line:
line = line.replace(' ', ' ')
return line
"""
prende in input una lista contenente i vari campi data
gia splittati!
Ricorda: i primi campi sono fissi, posso richiamare direttamente
la funzione corretta:
CHROM POS ID REF ALT QUAL FILTER INFO
"""
def __parseData(self,lst):
lst_len = len(lst)
if (self.__lenHeaderDataLine > lst_len):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", missing same data!")
elif (self.__lenHeaderDataLine < lst_len):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", too much data!")
# controllo tutti i campi fissi, devono esserci per forza!!!
self.__myDataChrom(lst[0])
self.__myDataPos(lst[1])
self.__myDataId(lst[2])
self.__myDataRef(lst[3])
numDataAllowed = self.__myDataAlt(lst[4])
self.__myDataQual(lst[5])
self.__myDataFilter(lst[6])
self.__myDataInfo(lst[7],numDataAllowed)
# se è definito il campo format
if(self.__formatDefine == True):
self.__myDataFormat(lst[8])
# se sono definiti anche dei sample
if(self.__otherSampleDefine == True):
for sampleLine in lst[9:]:
self.__myDataSample(sampleLine, lst[8],numDataAllowed)
"""
controlla che il campo chrome della riga dati sia well-formed e valido
"""
def __myDataChrom(self,chrom):
# debug printing
logging.debug(" __myChrom() parsing chrom: " + chrom)
response = re.match('(([a-z0-9_]+$)|(.$)|(<([a-z0-9]+|.)>$)|.$)', chrom, re.IGNORECASE)
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", CHROM malformed!")
# se è il primo chrom che incontro lo aggiungo alla lista e parto con l'ordine crescente per le pos
if(len(self.__lstChromValue) == 0):
self.__lstChromValue.append(chrom)
self.__lastPosValue = -1
# se trovo un nuovo chrom parto di nuovo con l'ordine crescente
elif(self.__lstChromValue[-1] != chrom):
self.__lstChromValue.append(chrom)
self.__lastPosValue = -1
"""
controlla che il campo pos della riga dati sia well-formed e valido
I campi pos devono essere in ordine crescente, quindi confronto il valore
che trovo con il precedente
"""
def __myDataPos(self,pos):
# debug printing
logging.debug(" __myPos() parsing pos: " + pos)
response = re.match("([0-9]+|.)$", pos, re.IGNORECASE)
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", POS malformed!")
# controllo se pos ha un valore o non è noto
elif(pos != "."):
# se il pos è noto allora controllo sia maggiore rispetto al precedente
# e ne aggiorno il valore, altrimenti esco.
if (int(pos) < self.__lastPosValue):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", POS value must be in ascending order!")
self.__lastPosValue = int(pos)
# salvo il valore del pos trovato nella lista perchè mi serve per il campo alt
self.__lstPosValue.append(pos)
"""
controlla che il campo id della riga dati sia well-formed e valido
"""
def __myDataId(self,id):
# debug printing
logging.debug(" __myId() parsing id: " + id)
splitId = id.split(";")
for singleSplit in splitId:
response = re.match('(([a-z0-9_.]+$)|(.$))', singleSplit, re.IGNORECASE)
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", ID malformed!")
"""
controlla che il campo ref della riga dati sia well-formed e valido
"""
def __myDataRef(self,ref):
# debug printing
logging.debug(" __myRef() parsing ref: " + ref)
response = re.match('((A|C|G|T|N)+)|.$', ref, re.IGNORECASE)
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", REF malformed!")
"""
controlla che il campo alt della riga dati sia well-formed e valido
Alt può essere formato da:
1) una stringa formata da A,C,G,T,N,*
2) . se valore sconosciuto
3) <id_alt>
4) un angle brackets string, vedesi specifiche pdf page 12
Nei casi 1-4 può essere formata da piu valori suddivisi da ,
"""
def __myDataAlt(self,alt):
# debug printing
logging.debug(" __myDataAlt() parsing dataAlt: " + alt)
numberOfDataAllowed = 1
"""
# controllo se sono nel caso due
if(alt.startswith("<")):
if(alt in self.__lstIdAltValue == False):
sys.exit("Error at line: " + str(self.__actualLine + 1)
+ ", ALT malformed! No value id defineds")
elif(alt == "."): # case 2
return numberOfDataAllowed
else: # case 1 and 4
"""
# splitto la stringa ad ogni , e poi confronto
lstSplitted = alt.split(",")
# controllo che la stringa non contenga doppioni non avrebbe senso
if len(lstSplitted) != len(set(lstSplitted)):
sys.exit("Error at line: " + str(self.__actualLine + 1)
+ ", duplicate alt data are not allowed!")
numberOfDataAllowed = len(lstSplitted)
for val in lstSplitted:
if(val.startswith("<")):
if(val in self.__lstIdAltValue == False):
sys.exit("Error at line: " + str(self.__actualLine + 1)
+ ", ALT malformed! No value id defineds")
elif(alt == "."):
continue
else:
# controllo se è nel caso 1
response = self.__matchAltBaseString(val)
# se il caso 1 è falso controllo il 4, se è falso error
if (not response):
"""
controllo se la stringa parte con [ cerco l'altra [,
matcho tutto dopo [ con __matchAltBaseString
e quello tra [] controllo sia chrom:pos
"""
beetweenPar = None
baseStr = None
if(val.startswith("[")):
indexPar = val[1:].find("[") +1
beetweenPar = val[1:indexPar]
baseStr = val[indexPar+1:]
elif(val.startswith("]")):
indexPar = val[1:].find("]") +1
beetweenPar = val[1:indexPar]
baseStr = val[indexPar+1:]
elif(val.endswith("]")):
indexPar = val.find("]")
beetweenPar = val[indexPar+1:len(val)-1]
baseStr = val[:indexPar]
elif(val.endswith("[")):
indexPar = val.find("[")
beetweenPar = val[indexPar+1:len(val)-1]
baseStr = val[:indexPar]
if (beetweenPar == None or baseStr == None):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", ALT malformed!")
else:
# matcho tutto quello dopo con baseStr
res = self.__matchAltBaseString(baseStr)
# creo la coppia e l'aggiungo alla lista
couple = (beetweenPar, self.__actualLine)
self.__lstBreakendsValue.append(couple)
if(res == False):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", ALT malformed!")
return numberOfDataAllowed # mi serve sapere da quanti valori è composto il campo alt per gestire il campo data
"""
prova a matchare la stringa con A,c,g,t,n,.,*
"""
def __matchAltBaseString(self, baseStr):
response = re.match('[A|C|G|T|N|.|\*]+$', baseStr, re.IGNORECASE)
if(response):
return True
return False
"""
controllo che la __lstBreakendsValue sia del tipo chromVal:PosVal
con valori di chrom e pos validi
"""
def __matchBeetwenParenthesis(self):
for couple in self.__lstBreakendsValue:
values = couple[0].split(":")
if(len(values)!=2): # non ho trovato i : allora non è ben formata
sys.exit("Error at line: " + str(int(couple[1]) + 1) + ", ALT breakends malformed!")
# controllo che i due valori siano chrom:pos
elif(values[0] not in self.__lstChromValue or values[1] not in self.__lstPosValue):
sys.exit("Error at line: " + str(int(couple[1]) + 1) + ", ALT breakends value not valid!")
"""
controlla che il campo qual della riga dati sia well-formed e valido
"""
def __myDataQual(self,qual):
# debug printing
logging.debug(" __myQual() parsing qual: " + qual)
response = re.match('([0-9]+|[0-9]+.[0-9]+|.)$', qual, re.IGNORECASE)
# se non è well-formed stampo errore
if (not response):
sys.exit("Error at line: " + str(self.__actualLine + 1) + ", qual malformed!")
"""
controlla che il campo filter della riga dati sia well-formed e valido
In questo caso non serve una regex il campo puo essere:
PASS
.
una combinazione dei valori presenti in __lstIdFilterValue
"""
def __myDataFilter(self,filter):
# debug printing
logging.debug(" __myDataFilter() parsing filter: " + filter)
filter = filter.upper() # metto tutto in maiuscolo per semplicita
# controllo se uguale a PASS
if(filter == "PASS"):
return
# controllo se uguale al .
if(filter == "."):
return
# controllo se uguale a 0 (meglio non usarlo ma possibile)
if(filter == "0"):
print("Warning! Value 0 should not be use as value for filter at line: " + str(self.__actualLine + 1))
return
# cerco il primo ;
indexEqual = filter.find(";")
# se non trovo nessun ; controllo che sia un singolo valore presente nella lista
if(indexEqual == -1 and filter in self.__lstIdFilterValue):
return
else:
# ci sono dei punti e virgola, controllo che il numero di ;
# non superi il numero di filtri presenti nella lista
# es: lst = q10,s50 filter=q10;s50;q10 NO!
count = len(re.findall(";", filter))