-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathnature_id.py
executable file
·537 lines (453 loc) · 20.9 KB
/
nature_id.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
#!/usr/bin/env python3
import numpy as np
from PIL import Image, ImageOps
import csv, sys, os, time
import inat_taxonomy
try:
# try importing TensorFlow Lite first
import tflite_runtime.interpreter as tflite
except Exception:
try:
# TensorFlow Lite not found, try to import full TensorFlow
import tensorflow.lite as tflite
except Exception:
print('Error: TensorFlow Lite could not be loaded.', file=sys.stderr)
print(' Follow instructions at https://www.tensorflow.org/lite/'
'guide/python to install it.', file=sys.stderr)
sys.exit(1)
# The directory where this Python script is located.
INSTALL_DIR = inat_taxonomy.INSTALL_DIR
# This directory contains models, label files, and taxonomy files.
CLASSIFIER_DIRECTORY = os.path.join(INSTALL_DIR, 'classifiers')
# These flags can be modified with command-line options.
scientific_names_only = False # only scientific names or also common names
label_scores_only = False # scores for labels or hierarchical
all_common_names = False # show only one or all common names
result_sz = 5 # result size (for label_scores_only)
# This class is used by class Taxonomy.
class Taxon:
def __init__(self, taxon_id):
self.taxon_id = taxon_id # for internal lookups and iNat API calls
self.rank_level = None # taxonomic rank, e.g. species, genus, family
self.name = None # scientific name
self.common_name = None # common name or None
self.children = [] # list of child taxa
self.leaf_class_ids = [] # list of indices into scores; there
# can be more than one when we use old models
# whose taxa have since been lumped together
def add_child(self, child_taxon):
self.children.append(child_taxon)
# get taxonomic rank as a string
def get_rank(self):
if self.taxon_id < 0: # pseudo-kingdom?
assert self.rank_level == inat_taxonomy.KINGDOM_RANK_LEVEL
return ''
return inat_taxonomy.get_rank_name(self.rank_level)
# get the name to display; customize here to show common names differently
def get_name(self):
if self.common_name:
return f'{self.common_name} ({self.name})'
else:
return self.name
# This taxonomy is represented in terms of instances of class Taxon.
class Taxonomy:
def __init__(self):
# The taxonomy file may contain multiple trees, one for each kingdom.
# In order to have a single tree for prediction, we add a node for
# Life as the parent of all kingdoms. This will be the root of our tree.
self.root = Taxon(inat_taxonomy.ROOT_TAXON_ID)
self.root.name = inat_taxonomy.ROOT_NAME
self.root.rank_level = inat_taxonomy.ROOT_RANK_LEVEL
self.id2taxon = { self.root.taxon_id : self.root }
self.idx2label = {}
def reset(self):
self.root.children = []
self.id2taxon = { self.root.taxon_id : self.root }
self.idx2label = {}
def taxonomy_available(self):
return len(self.root.children) > 0
def read_taxonomy(self, filename):
start_time = time.time()
self.reset()
with open(filename, newline='', encoding='latin-1') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if 'id' in row: # this is a label file
self.idx2label[int(row['id'])] = row['name']
continue
taxon_id = int(row['taxon_id'])
if taxon_id in self.id2taxon:
taxon = self.id2taxon[taxon_id] # inserted earlier as parent
else:
self.id2taxon[taxon_id] = taxon = Taxon(taxon_id)
taxon.name = row['name']
if row['rank_level'].isdigit():
taxon.rank_level = int(row['rank_level'])
else:
taxon.rank_level = float(row['rank_level'])
if len(row['leaf_class_id']):
for leaf_class_id in row['leaf_class_id'].split(';'):
leaf_class_id = int(leaf_class_id)
taxon.leaf_class_ids.append(leaf_class_id)
self.idx2label[leaf_class_id] = taxon.name
if len(row['parent_taxon_id']):
parent_taxon_id = int(row['parent_taxon_id'])
else:
parent_taxon_id = self.root.taxon_id
if not parent_taxon_id in self.id2taxon:
self.id2taxon[parent_taxon_id] = Taxon(parent_taxon_id)
self.id2taxon[parent_taxon_id].add_child(taxon)
if not self.taxonomy_available():
# We parsed a label file; unless told otherwise, we use these
# labels to build a taxonomic tree.
print(f"Read {len(self.idx2label):,} labels from '{filename}' "
f"in {time.time() - start_time:.1f} secs.")
if not label_scores_only:
self.compute_taxonomic_tree()
if self.taxonomy_available():
self.write_taxonomic_tree(filename.replace('labelmap',
'taxonomy'))
else:
print(f"Read taxonomy from '{filename}' in "
f"{time.time() - start_time:.1f} secs: "
f"{len(self.id2taxon) - 1:,} taxa including "
f"{len(self.idx2label):,} leaf taxa.")
if not scientific_names_only and self.taxonomy_available():
inat_taxonomy.annotate_common_names(self.id2taxon, all_common_names)
if label_scores_only:
self.annotate_labels_with_common_names()
del self.id2taxon # not needed anymore
# augment labels with common names
def annotate_labels_with_common_names(self):
for taxon in self.id2taxon.values():
for leaf_class_id in taxon.leaf_class_ids:
self.idx2label[leaf_class_id] = taxon.get_name()
# write one row to taxonomy file
def write_row(self, writer, taxon, parent_taxon_id):
writer.writerow([parent_taxon_id, taxon.taxon_id, taxon.rank_level,
';'.join([str(id) for id in taxon.leaf_class_ids]),
taxon.name])
for child in taxon.children:
self.write_row(writer, child, taxon.taxon_id)
# write taxonomy file
def write_taxonomic_tree(self, filename):
try:
with open(filename, 'w', newline='', encoding='latin-1') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['parent_taxon_id', 'taxon_id', 'rank_level',
'leaf_class_id', 'name'])
for child in self.root.children:
self.write_row(writer, child, '')
print(f"Taxonomy written to file '{filename}'.")
except Exception as e:
print(f"Failure writing taxonomy to file '{filename}':", str(e))
try:
os.remove(filename)
except Exception:
pass
# Called after loading label file for Google's AIY Vision Kit.
# Adds all the labels' direct and indirect ancestors to compute
# the taxonomic tree.
def compute_taxonomic_tree(self):
global label_scores_only
if not inat_taxonomy.load_inat_taxonomy():
label_scores_only = True
return
start_time = time.time()
new_id = 0 # id's we add on the fly for pseudo-kingdoms
for idx, name in self.idx2label.items():
inat_taxa = inat_taxonomy.lookup_id(name)
if not inat_taxa:
print(f"Info: Taxon for label '{name}' not found, "
"inserting as pseudo-kingdom.")
new_id -= 1
taxon_id = new_id
self.id2taxon[taxon_id] = taxon = Taxon(taxon_id)
taxon.rank_level = inat_taxonomy.KINGDOM_RANK_LEVEL
taxon.name = name
taxon.leaf_class_ids = [idx]
self.root.add_child(taxon)
continue
inat_taxon, ancestors = inat_taxa
if name != inat_taxon.name:
print(f"Info: Taxon '{name}' changed to "
f"'{inat_taxon.name}', iNat taxa "
f"id {inat_taxon.id}.")
# ancestor taxa
prev_ancestor = self.root
for ancestor in ancestors:
if ancestor.id in self.id2taxon:
prev_ancestor = self.id2taxon[ancestor.id]
else:
self.id2taxon[ancestor.id] = ancestor_taxon = Taxon(ancestor.id)
ancestor_taxon.name = ancestor.name
ancestor_taxon.rank_level = ancestor.rank_level
prev_ancestor.add_child(ancestor_taxon)
prev_ancestor = ancestor_taxon
# this taxon
if inat_taxon.id in self.id2taxon:
taxon = self.id2taxon[inat_taxon.id]
assert taxon.name == inat_taxon.name
assert taxon.rank_level == inat_taxon.rank_level
else:
self.id2taxon[inat_taxon.id] = taxon = Taxon(inat_taxon.id)
taxon.name = inat_taxon.name
taxon.rank_level = inat_taxon.rank_level
prev_ancestor.add_child(taxon)
taxon.leaf_class_ids.append(idx)
print("Computed taxonomic tree from labels in "
f"{time.time() - start_time:.1f} secs: {len(self.id2taxon)-1:,} "
f"taxa including {len(self.idx2label):,} leaf taxa.")
# propagate scores to taxon and all below
def assign_scores(self, taxon, scores):
taxon.score = 0.0
for leaf_class_id in taxon.leaf_class_ids:
taxon.score += scores[leaf_class_id]
for child in taxon.children:
self.assign_scores(child, scores)
taxon.score += child.score
# Returns list of 5-tuples (score, taxon_id, taxonomic rank,
# scientific name, common name) ordered by taxonomic rank from kingdom
# down to e.g. species.
# Returns pairs (score, scientific name) if label_scores_only
# is set.
def prediction(self, scores):
if label_scores_only:
# return list of pairs (score, scientific name)
total = np.sum(scores)
indices = np.argpartition(scores, -result_sz)[-result_sz:]
results = [(scores[i] / total, self.idx2label[i])
for i in indices if scores[i] != 0]
results.sort(reverse=True)
return results
# annotate all taxa across the hierarchy with scores.
self.assign_scores(self.root, scores)
# return one hierarchical path guided by scores
path = []
taxon = self.root
while taxon.children:
# Find child with highest score.
best_child = None
for child in taxon.children:
if not best_child or child.score > best_child.score:
best_child = child
# Truncate path if all the other children combined are better
if best_child.score < 0.5 * taxon.score:
break
path.append((best_child.score / self.root.score,
best_child.taxon_id, best_child.get_rank(),
best_child.get_name()))
taxon = best_child
return path
#
# Offline image classification.
#
class OfflineClassifier:
def __init__(self, filenames):
self.min_pixel_value = 0.0
self.max_pixel_value = 255.0
if os.path.split(filenames[0])[1] in ['optimized_model.tflite',
'optimized_model_v1.tflite']:
self.min_pixel_value = -1.0
self.max_pixel_value = 1.0
# Load TFLite model and allocate tensors.
self.mInterpreter = tflite.Interpreter(model_path=filenames[0])
self.mInterpreter.allocate_tensors()
# Get input and output tensors.
self.mInput_details = self.mInterpreter.get_input_details()
self.mOutput_details = self.mInterpreter.get_output_details()
# Read labels or taxonomy
self.mTaxonomy = Taxonomy()
self.mTaxonomy.read_taxonomy(filenames[1])
def classify_image(self, image_filename):
start_time = time.time()
try:
img = Image.open(image_filename)
except:
print(f"Error: cannot load image '{image_filename}'.")
return []
if img.mode != 'RGB':
print(f"Error: image '{image_filename}' is of mode '{img.mode}',"
" only mode RGB is supported.")
return []
# rotate image if needed as it may contain EXIF orientation tag
img = ImageOps.exif_transpose(img)
model_size = tuple(self.mInput_details[0]['shape'][1:3])
# square target shape expected by crop code below
assert model_size[0] == model_size[1]
if img.size != model_size:
# We need to scale and maybe want to crop image.
width, height = img.size
if width != height:
# Before scaling, we crop image to square shape.
left = 0
right = width
top = 0
bottom = height
if width < height:
top = (height - width) / 2
bottom = top + width
else:
left = (width - height) / 2
right = left + height
img = img.crop((left, top, right, bottom))
# scale image
img = img.resize(model_size)
#img.show()
# pixels are in range 0 ... 255, turn into numpy array
input_data = np.array([np.array(img, self.mInput_details[0]['dtype'])])
if self.mInput_details[0]['dtype'] == np.float32:
input_data *= (self.max_pixel_value - self.min_pixel_value) / 255.0
input_data += self.min_pixel_value
self.mInterpreter.set_tensor(self.mInput_details[0]['index'],
input_data)
self.mInterpreter.invoke()
output_data = self.mInterpreter.get_tensor(self.mOutput_details[0]
['index'])
path = self.mTaxonomy.prediction(output_data[0])
print()
print(f"Classification of '{image_filename}' took "
f"{time.time() - start_time:.1f} secs.")
return path
# Returns a dictionary that maps available classifiers to a pair of filenames.
def get_installed_models():
if not os.path.isdir(CLASSIFIER_DIRECTORY):
print("Cannot load classifiers, directory "
f"'{CLASSIFIER_DIRECTORY}' does not exist.")
sys.exit(1)
choices = [ 'birds', 'insects', 'plants']
models = {}
for filename in os.listdir(CLASSIFIER_DIRECTORY):
model = None
if filename.endswith(".csv"):
if filename == 'taxonomy_v2_13.csv':
model = 'v2_13'
elif filename == 'taxonomy_v1.csv':
model = 'Seek'
else:
for m in choices:
if filename.find(m) != -1:
model = m
break
if model:
filename = os.path.join(CLASSIFIER_DIRECTORY, filename)
if model in models:
if not models[model][1] or models[model][1].\
endswith('labelmap.csv'):
models[model] = (models[model][0], filename)
else:
models[model] = (None, filename)
elif filename.endswith(".tflite"):
if filename == 'optimized_model_v2_13.tflite':
model = 'v2_13'
elif filename == 'optimized_model_v1.tflite':
model = 'Seek'
else:
for m in choices:
if filename.find(m) != -1:
model = m
break
if model:
filename = os.path.join(CLASSIFIER_DIRECTORY, filename)
if model in models:
models[model] = (filename, models[model][1])
else:
models[model] = (filename, None)
delete_elements = [] # postponed deletion, cannot delete during iteration
for name, files in models.items():
if not files[0] or not files[1]:
tf_missing = ".csv file but no .tflite file"
csv_missing = ".tflite file but no .csv file"
print("Installation issue: Excluding incomplete classifier for"
f" '{name}': {tf_missing if files[1] else csv_missing}.")
delete_elements.append(name)
for element in delete_elements:
del models[element]
if not models:
print(f"No classifiers found in directory '{CLASSIFIER_DIRECTORY}'; "
"follow instructions in "
f"'{os.path.join(CLASSIFIER_DIRECTORY,'README.md')}'"
" to install them.", file=sys.stderr)
sys.exit(1)
return models
def identify_species(classifier, filename):
result = classifier.classify_image(filename)
if result:
# Print list of tuples (score, taxon id, taxonomic rank, name)
# ordered by taxonomic rank from kingdom down to species.
for entry in result:
if len(entry) == 2: # labels only
print(f'{100 * entry[0]:5.1f}% {entry[1]}')
continue
print(f'{100 * entry[0]:5.1f}% {entry[2]:11s} {entry[3]}')
# command-line parsing
models = get_installed_models()
def model_parameter_check(arg):
if not arg in models:
msg = f"Model '{arg}' not available. Available "\
f"model{'' if len(models)==1 else 's'}:"
prefix = ' '
for m in models:
msg += f"{prefix}'{m}'"
prefix = ', '
msg += '.'
raise argparse.ArgumentTypeError(msg)
return arg
def result_size_check(arg):
if arg.isdigit() and int(arg) > 0 and int(arg) <= 100:
return int(arg)
raise argparse.ArgumentTypeError(f"'{arg}' is not a number "
"between 1 and 100.")
def file_directory_check(arg):
if os.path.isdir(arg) or os.path.isfile(arg):
return arg
raise argparse.ArgumentTypeError(f"'{arg}' is not a file or directory.")
#
# Identify species for picture files and directories given as command line args
#
if __name__ == '__main__':
import argparse
preferred1 = 'v2_13' # default if this model is available
preferred2 = 'Seek' # second preference
parser = argparse.ArgumentParser()
if len(models) == 1 or preferred1 in models or preferred2 in models:
default_model = preferred1 if preferred1 in models else \
preferred2 if preferred2 in models else \
next(iter(models))
parser.add_argument("-m", "--model", type=model_parameter_check,
default=default_model,
help="Model to load to identify organisms.")
else: # no default for classification model
parser.add_argument("-m", "--model", type=model_parameter_check,
required=True,
help="Model to load to identify organisms.")
parser.add_argument('-a', '--all_common_names', action="store_true",
help='Show all common names and not just one.')
parser.add_argument('-l', '--label_scores_only', action="store_true",
help='Compute and display only label scores, '
'do not propagate scores up the hierarchy.')
parser.add_argument('-s', '--scientific_names_only', action="store_true",
help='Only use scientific names, do not load common '
'names.')
parser.add_argument('-r', '--result_size', type=result_size_check,
default=result_sz, help='Number of labels and their '
'scores to report in results.')
parser.add_argument('files_dirs', metavar='file/directory',
type=file_directory_check, nargs='+',
help='Image files or directories with images.')
args = parser.parse_args()
scientific_names_only = args.scientific_names_only
label_scores_only = args.label_scores_only
all_common_names = args.all_common_names
result_sz = args.result_size
# make classifier instance
classifier = OfflineClassifier(models[args.model])
# process photos
for arg in args.files_dirs:
if os.path.isfile(arg):
identify_species(classifier, arg)
elif os.path.isdir(arg):
for file in os.listdir(arg):
ext = os.path.splitext(file)[1].lower()
if ext in ['.jpg', '.jepg', '.png']:
identify_species(classifier, os.path.join(arg, file))