Skip to content

Commit f88d2de

Browse files
#422: Stream data to vt-tv for rendering (#463)
* #422: Begin experimenting with VTDataWriter to pass data to vt-tv * #422: Add configuration parameters for vttv call and continue data payload serialization * #422: Clean up call to vt-tv * #422: try serializing data * #16: fix missing communications * #422: add max_volume as an object qoi; fix reader * #422: try putting vttv import in a try/except block for ci * #422: add warning for when viz is enabled but vttv not found --------- Co-authored-by: Caleb Schilly <cwschilly@gmail.com>
1 parent a76502a commit f88d2de

File tree

6 files changed

+101
-18
lines changed

6 files changed

+101
-18
lines changed

config/test-vt-tv.yaml

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Specify input
2+
from_data:
3+
data_stem: /home/pierrelp/Develop/NGA/vt-tv/source/tests/unit/lb_test_data/data
4+
phase_ids:
5+
- 0
6+
- 1
7+
- 2
8+
- 3
9+
- 4
10+
- 5
11+
- 6
12+
- 7
13+
check_schema: False
14+
15+
# Specify work model
16+
work_model:
17+
name: AffineCombination
18+
parameters:
19+
alpha: 1.0
20+
beta: 1.0e-08
21+
gamma: 0.0
22+
23+
# Specify algorithm
24+
algorithm:
25+
name: PhaseStepper
26+
27+
# Specify output
28+
output_dir: ../output
29+
output_file_stem: output_file
30+
visualization:
31+
x_ranks: 2
32+
y_ranks: 2
33+
z_ranks: 1
34+
object_jitter: 0.5
35+
rank_qoi: load
36+
object_qoi: load
37+
save_meshes: true
38+
force_continuous_object_qoi: true
39+
output_visualization_dir: ../output
40+
output_visualization_file_stem: output_file
41+
42+
write_JSON:
43+
compressed: False
44+
suffix: json
45+
communications: True
46+
offline_LB_compatible: True

src/lbaf/Applications/LBAF_app.py

+40-16
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
import importlib
77
import yaml
88

9+
try:
10+
import vttv
11+
using_vttv = True
12+
except ModuleNotFoundError:
13+
using_vttv = False
14+
915
# pylint:disable=C0413:wrong-import-position
1016
# Use lbaf module from source if lbaf package is not installed
1117
if importlib.util.find_spec('lbaf') is None:
@@ -14,7 +20,6 @@
1420
from lbaf import PROJECT_PATH, __version__
1521
from lbaf.Execution.lbsRuntime import Runtime
1622
from lbaf.IO.lbsConfigurationValidator import ConfigurationValidator
17-
from lbaf.IO.lbsVisualizer import Visualizer
1823
from lbaf.IO.lbsVTDataReader import LoadReader
1924
from lbaf.IO.lbsVTDataWriter import VTDataWriter
2025
from lbaf.Model.lbsPhase import Phase
@@ -113,6 +118,11 @@ def init_parameters(self, config: dict, base_dir: str):
113118

114119
# Parse visualizer parameters when available
115120
if (viz := config.get("visualization")) is not None:
121+
122+
# Ensure that vttv module was found
123+
if not using_vttv:
124+
self.__logger.warning("Visualization enabled but vttv not found. No visualization will be generated.")
125+
116126
# Retrieve mandatory visualization parameters
117127
try:
118128
self.grid_size = []
@@ -570,21 +580,35 @@ def run(self):
570580
self.__parameters.object_qoi
571581
]
572582

573-
# Instantiate and execute visualizer
574-
visualizer = Visualizer(
575-
self.__logger,
576-
qoi_request,
577-
self.__parameters.continuous_object_qoi,
578-
phases,
579-
self.__parameters.grid_size,
580-
self.__parameters.object_jitter,
581-
self.__parameters.output_dir,
582-
self.__parameters.output_file_stem,
583-
runtime.get_distributions(),
584-
runtime.get_statistics())
585-
visualizer.generate(
586-
self.__parameters.save_meshes,
587-
not self.__parameters.rank_qoi is None)
583+
# Call vttv visualization
584+
if using_vttv:
585+
self.__logger.info("Calling vt-tv")
586+
587+
# Serialize data to JSON-formatted string
588+
self.__rank_phases = {}
589+
for p in phases.values():
590+
for r in p.get_ranks():
591+
self.__rank_phases.setdefault(r.get_id(), {})
592+
self.__rank_phases[r.get_id()][p.get_id()] = r
593+
594+
ranks_json_str = []
595+
for i in range(len(self.__rank_phases.items())):
596+
ranks_json_str.append(self.__json_writer._json_serializer((i, self.__rank_phases[i])))
597+
598+
vttv_params = {
599+
"x_ranks": self.__parameters.grid_size[0],
600+
"y_ranks": self.__parameters.grid_size[1],
601+
"z_ranks": self.__parameters.grid_size[2],
602+
"object_jitter": self.__parameters.object_jitter,
603+
"rank_qoi": self.__parameters.rank_qoi,
604+
"object_qoi": self.__parameters.object_qoi,
605+
"save_meshes": self.__parameters.save_meshes,
606+
"force_continuous_object_qoi": self.__parameters.continuous_object_qoi,
607+
"output_visualization_dir": self.__parameters.output_dir,
608+
"output_visualization_file_stem": self.__parameters.output_file_stem
609+
}
610+
num_ranks = self.__parameters.grid_size[0] * self.__parameters.grid_size[1] * self.__parameters.grid_size[2]
611+
vttv.tvFromJson(ranks_json_str, str(vttv_params), num_ranks)
588612

589613
# Report on rebalanced phase when available
590614
if rebalanced_phase:

src/lbaf/IO/lbsVTDataReader.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> tuple:
221221
for k, v in comm.items():
222222
self.__logger.debug(f"{k}: {v}")
223223
else:
224-
self.__communications_dict[phase_id] = {}
224+
self.__communications_dict.setdefault(phase_id, {rank_id: {}})
225225

226226
# Instantiante rank for current phase
227227
phase_rank = Rank(self.__logger, rank_id)

src/lbaf/IO/lbsVTDataWriter.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ def write(self, phases: dict):
209209
for p in phases.values():
210210
for r in p.get_ranks():
211211
self.__rank_phases.setdefault(r.get_id(), {})
212-
self.__rank_phases[r.get_id()][p.get_id()]= r
212+
self.__rank_phases[r.get_id()][p.get_id()] = r
213213

214214
# Prevent recursion overruns
215215
sys.setrecursionlimit(25000)

src/lbaf/Model/lbsObject.py

+4
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ def get_sent_volume(self) -> float:
126126
"""Return volume of communications sent by object."""
127127
return sum([v for v in self.__communicator.get_sent().values()]) if self.__communicator else 0
128128

129+
def get_max_volume(self) -> float:
130+
"""Return the maximum bytes received or sent by object."""
131+
return self.__communicator.get_max_volume() if self.__communicator else 0
132+
129133
def set_rank_id(self, r_id: int) -> None:
130134
"""Assign object to rank ID"""
131135
self.__rank_id = r_id

src/lbaf/Model/lbsObjectCommunicator.py

+9
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ def get_sent_to_object(self, o):
5656
"""Return the volume of a message received from an object if any."""
5757
return self.__sent.get(o)
5858

59+
def get_max_volume(self):
60+
"""Return the maximum bytes received or sent at this communicator."""
61+
max_received, max_sent = 0., 0.
62+
if len(self.__sent) > 0:
63+
max_sent = max(self.__sent.values())
64+
if len(self.__received) > 0:
65+
max_received = max(self.__received.values())
66+
return max(max_received, max_sent)
67+
5968
def summarize(self) -> tuple:
6069
"""Summarize communicator properties and check for errors."""
6170
# Summarize sent communications

0 commit comments

Comments
 (0)