-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy patharkitscenes_data_utils.py
113 lines (102 loc) · 4.16 KB
/
arkitscenes_data_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import os
from concurrent import futures as futures
from os import path as osp
import mmengine
import numpy as np
from typing import List, Optional
class ARKitScenesOfflineData:
"""ARKitScenesOfflineData
Generate arkitscenes infos (offline benchmark) for indoor_converter.
Args:
root_path (str): Root path of the raw data.
split (str): Stplit type 'train' or 'val'.
"""
def __init__(self, root_path: str, split: str):
self.split = split
raw_path = os.path.join(root_path, '3dod')
self.data_path = os.path.join(root_path, 'offline_prepared_data')
assert split in ['train', 'val']
class_names = [
'cabinet', 'refrigerator', 'shelf', 'stove', 'bed',
'sink', 'washer', 'toilet', 'bathtub', 'oven',
'dishwasher', 'fireplace', 'stool', 'chair', 'table',
'tv_monitor', 'sofa'
]
self.name2class = {
name: i
for i, name in enumerate(class_names)
}
all_id_list = set(
map(lambda x: x.split('_')[0],
os.listdir(self.data_path)))
split_dir = 'Training' if split == 'train' else 'Validation'
split_id_list = set(os.listdir(osp.join(raw_path, split_dir)))
self.sample_id_list = all_id_list & split_id_list
print(f'{split}, raw ids: {len(split_id_list)}, '
f'processed ids: {len(self.sample_id_list)}')
def __len__(self) -> int:
"""Length of the dataset."""
return len(self.sample_id_list)
def get_infos(self,
num_workers: int = 4,
has_label: bool = True,
sample_id_list: Optional[List[str]] = None) -> dict:
"""Get data infos.
This method gets information from the raw data.
Args:
num_workers (int, optional): Number of threads to be used.
Default: 4.
has_label (bool, optional): Whether the data has label.
Default: True.
sample_id_list (list[str], optional): Index list of the sample.
Default: None.
Returns:
dict: Information of the raw data.
"""
def process_single_scene(sample_idx):
print(f'{self.split} sample_idx: {sample_idx}', end='\r')
info = {
'lidar_points': {
'num_pts_feats': 6,
'lidar_path': f'{sample_idx}_point.bin'
}
}
boxes = np.load(
osp.join(self.data_path, f'{sample_idx}_bbox.npy'))
labels = np.load(
osp.join(self.data_path, f'{sample_idx}_label.npy'))
instances = []
for box, label in zip(boxes, labels):
# follow heading angle of DepthInstance3DBoxes
box[-1] = -box[-1]
instances.append({
'bbox_3d': box.tolist(),
'bbox_label_3d': self.name2class[label]
})
info['instances'] = instances
return info
sample_id_list = sample_id_list if sample_id_list is not None \
else self.sample_id_list
with futures.ThreadPoolExecutor(num_workers) as executor:
infos = executor.map(process_single_scene, list(sample_id_list))
infos = {
'metainfo': {
'categories': self.name2class,
'dataset': 'arkitscenes_offline',
'info_version': '1.0'
},
'data_list': list(infos)
}
return infos
# do not want to add create_annotations.py to projects
if __name__ == '__main__':
root_path = '/opt/project/data/arkitscenes'
out_path = '/opt/project/work_dirs/tmp'
infos_train = ARKitScenesOfflineData(
root_path=root_path, split='train').get_infos()
train_filename = osp.join(out_path, 'arkitscenes_offline_infos_train.pkl')
mmengine.dump(infos_train, train_filename, 'pkl')
infos_val = ARKitScenesOfflineData(
root_path=root_path, split='val').get_infos()
val_filename = osp.join(out_path, 'arkitscenes_offline_infos_val.pkl')
mmengine.dump(infos_val, val_filename, 'pkl')