-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmsannika_fdr.py
259 lines (208 loc) · 10.5 KB
/
msannika_fdr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
#!/usr/bin/env python3
# MS ANNIKA FDR VALIDATOR
# 2024 (c) Micha Johannes Birklbauer
# https://github.com/michabirklbauer/
# micha.birklbauer@gmail.com
# version tracking
__version = "1.1.5"
__date = "2024-06-19"
# REQUIREMENTS
# pip install numpy
# pip install pandas
# pip install openpyxl
######################
"""
DESCRIPTION:
A script to group and validate results from MS Annika searches.
USAGE:
msannika_fdr.py f [f ...]
[-fdr FDR][--false_discovery_rate FDR]
[-h][--help]
[--version]
positional arguments:
f MS Annika result files in Microsoft Excel format (.xlsx)
to process.
optional arguments:
-fdr FDR, --false_discovery_rate FDR
False discovery rate to validate results for. Supports
both percentage input (e.g. 1) or fraction input (e.g.
0.01). By default not set and the input results will
just be grouped to crosslinks (if CSMs as input) or
nothing will be done (if crosslinks as input).
Default: None
-h, --help show this help message and exit
--version show program's version number and exit
"""
######################
import argparse
import numpy as np
import pandas as pd
from typing import List
class MSAnnika_CSM_Grouper:
@staticmethod
def get_crosslink_key(row: pd.Series) -> str:
a = str(row["Sequence A"]) + str(row["Crosslinker Position A"])
b = str(row["Sequence B"]) + str(row["Crosslinker Position B"])
return "-".join(sorted([a, b]))
@staticmethod
def get_nr_proteins(row: pd.Series) -> int:
proteins_str = str(row["Accession A"]).strip(";") + ";" + str(row["Accession B"]).strip(";")
return len(proteins_str.split(";"))
@staticmethod
def get_xl_position_in_protein(row: pd.Series, alpha: bool) -> int:
if alpha:
positions = [float(pos) + float(row["Crosslinker Position A"]) for pos in str(row["A in protein"]).split(";")]
else:
positions = [float(pos) + float(row["Crosslinker Position B"]) for pos in str(row["B in protein"]).split(";")]
return ";".join([str(int(pos)) for pos in positions if not pd.isna(pos)])
@staticmethod
def get_best_csm_score(csms: List[pd.Series]) -> float:
best_score = 0
for csm in csms:
if csm["Combined Score"] > best_score:
best_score = csm["Combined Score"]
return best_score
@staticmethod
def get_decoy_flag(row: pd.Series) -> bool:
return True if "D" in row["Alpha T/D"] or "D" in row["Beta T/D"] else False
@staticmethod
def group(data: pd.DataFrame) -> pd.DataFrame:
crosslinks = dict()
for i, row in data.iterrows():
crosslink = MSAnnika_CSM_Grouper.get_crosslink_key(row)
if crosslink in crosslinks:
crosslinks[crosslink].append(row)
else:
crosslinks[crosslink] = [row]
rows = list()
columns = ["CHECKED",
"Crosslinker",
"Crosslink Type",
"# CSMs",
"# Proteins",
"Sequence A",
"Accession A",
"Position A",
"Sequence B",
"Accession B",
"Position B",
"Protein Descriptions A",
"Protein Descriptions B",
"Best CSM Score",
"In protein A",
"In protein B",
"Decoy",
"Modifications A",
"Modifications B",
"Confidence"]
for crosslink in crosslinks:
row = pd.Series({"CHECKED": False,
"Crosslinker": crosslinks[crosslink][0]["Crosslinker"],
"Crosslink Type": crosslinks[crosslink][0]["Crosslink Type"],
"# CSMs": len(crosslinks[crosslink]),
"# Proteins": MSAnnika_CSM_Grouper.get_nr_proteins(crosslinks[crosslink][0]),
"Sequence A": crosslinks[crosslink][0]["Sequence A"],
"Accession A": crosslinks[crosslink][0]["Accession A"],
"Position A": crosslinks[crosslink][0]["Crosslinker Position A"],
"Sequence B": crosslinks[crosslink][0]["Sequence B"],
"Accession B": crosslinks[crosslink][0]["Accession B"],
"Position B": crosslinks[crosslink][0]["Crosslinker Position B"],
"Protein Descriptions A": crosslinks[crosslink][0]["Accession A"],
"Protein Descriptions B": crosslinks[crosslink][0]["Accession B"],
"Best CSM Score": MSAnnika_CSM_Grouper.get_best_csm_score(crosslinks[crosslink]),
"In protein A": MSAnnika_CSM_Grouper.get_xl_position_in_protein(crosslinks[crosslink][0], True),
"In protein B": MSAnnika_CSM_Grouper.get_xl_position_in_protein(crosslinks[crosslink][0], False),
"Decoy": MSAnnika_CSM_Grouper.get_decoy_flag(crosslinks[crosslink][0]),
"Modifications A": crosslinks[crosslink][0]["Modifications A"],
"Modifications B": crosslinks[crosslink][0]["Modifications B"],
"Confidence": "Unknown"})
rows.append(row)
return pd.concat(rows, ignore_index = True, axis = 1, names = columns).T
class MSAnnika_CSM_Validator:
@staticmethod
def get_class(row: pd.Series) -> str:
return "Decoy" if "D" in row["Alpha T/D"] or "D" in row["Beta T/D"] else "Target"
@staticmethod
def get_cutoff(data: pd.DataFrame, fdr: float) -> float:
data["Class"] = data.apply(lambda row: MSAnnika_CSM_Validator.get_class(row), axis = 1)
data["Class_label"] = data.apply(lambda row: 0 if row["Class"] == "Target" else 1, axis = 1)
labels = data["Class_label"].to_numpy()
labels_sorted = labels[data["Combined Score"].to_numpy().argsort()]
scores = sorted(data["Combined Score"].tolist())
for i, score in enumerate(scores):
if labels_sorted[i:].sum() / (labels_sorted[i:].shape[0] - labels_sorted[i:].sum()) < fdr:
return score
return scores[0]
@staticmethod
def validate(data: pd.DataFrame, fdr: float) -> pd.DataFrame:
cutoff = MSAnnika_CSM_Validator.get_cutoff(data, fdr)
df = data[data["Combined Score"] >= cutoff].copy()
if "Confidence" not in df.columns:
return df
df["Confidence"] = df.apply(lambda row: "High", axis = 1)
return df
class MSAnnika_Crosslink_Validator:
@staticmethod
def get_class(row: pd.Series) -> str:
return "Decoy" if row["Decoy"] else "Target"
@staticmethod
def get_cutoff(data: pd.DataFrame, fdr: float) -> float:
data["Class"] = data.apply(lambda row: MSAnnika_Crosslink_Validator.get_class(row), axis = 1)
data["Class_label"] = data.apply(lambda row: 0 if row["Class"] == "Target" else 1, axis = 1)
labels = data["Class_label"].to_numpy()
labels_sorted = labels[data["Best CSM Score"].to_numpy().argsort()]
scores = sorted(data["Best CSM Score"].tolist())
for i, score in enumerate(scores):
if labels_sorted[i:].sum() / (labels_sorted[i:].shape[0] - labels_sorted[i:].sum()) < fdr:
return score
return scores[0]
@staticmethod
def validate(data: pd.DataFrame, fdr: float) -> pd.DataFrame:
cutoff = MSAnnika_Crosslink_Validator.get_cutoff(data, fdr)
df = data[data["Best CSM Score"] >= cutoff].copy()
df["Confidence"] = df.apply(lambda row: "High", axis = 1)
return df
def main(argv = None) -> List[pd.DataFrame]:
parser = argparse.ArgumentParser()
parser.add_argument(metavar = "f",
dest = "files",
help = "Name/Path of the MS Annika result files to process.",
type = str,
nargs = "+")
parser.add_argument("-fdr", "--false_discovery_rate",
dest = "fdr",
default = None,
help = "FDR for CSM/crosslink validation.",
type = float)
parser.add_argument("--version",
action = "version",
version = __version)
args = parser.parse_args(argv)
if args.fdr is not None:
print(f"Using {float(args.fdr) if float(args.fdr) < 1.0 else float(args.fdr) / 100.0} FDR.")
result_list = list()
for f, file in enumerate(args.files):
df = pd.read_excel(file)
if "Combined Score" in df.columns:
crosslinks = MSAnnika_CSM_Grouper.group(df)
crosslinks.to_excel(".xlsx".join(file.split(".xlsx")[:-1]) + "_crosslinks.xlsx", sheet_name = "Crosslinks", index = False)
result_list.append(crosslinks)
if args.fdr is not None:
validated_csms = MSAnnika_CSM_Validator.validate(df, float(args.fdr) if float(args.fdr) < 1.0 else float(args.fdr) / 100.0)
validated_csms.to_excel(".xlsx".join(file.split(".xlsx")[:-1]) + "_validated.xlsx", sheet_name = "CSMs", index = False)
result_list.append(validated_csms)
validated_crosslinks = MSAnnika_Crosslink_Validator.validate(crosslinks, float(args.fdr) if float(args.fdr) < 1.0 else float(args.fdr) / 100.0)
validated_crosslinks.to_excel(".xlsx".join(file.split(".xlsx")[:-1]) + "_crosslinks_validated.xlsx", sheet_name = "Crosslinks", index = False)
result_list.append(validated_crosslinks)
else:
if args.fdr is not None:
validated_crosslinks = MSAnnika_Crosslink_Validator.validate(df, args.fdr if args.fdr < 1.0 else args.fdr / 100)
validated_crosslinks.to_excel(".xlsx".join(file.split(".xlsx")[:-1]) + "_validated.xlsx", sheet_name = "Crosslinks", index = False)
result_list.append(validated_crosslinks)
else:
print(f"Crosslink file without FDR given. Nothing to do. Skipping file {file}.")
print(f"Processed {f + 1} files...")
print("Done!")
return result_list
if __name__ == "__main__":
r = main()