Skip to content

Commit de70663

Browse files
authored
#338: separate out transfer phase and make it sub classable (#339)
* #338: completed base transfer strategy and fixed algorithm doc * #338: progress checkpoint of recursive transfer strategy * #338: fixed previous commit (added incorrect file) * #338: object ordering strategy is specific to recursive technique * #338: progress checkpoint * #338: pushed changes to internals to check against CI * #338: completed refactoring to separate out transfer and make it abstract
1 parent 6b0975b commit de70663

4 files changed

+348
-240
lines changed

src/lbaf/Execution/lbsAlgorithmBase.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def execute(self, phases, distributions, statistics, a_min_max):
154154
phases: list of Phase instances
155155
distributions: dictionary of load-varying variables
156156
statistics: dictionary of statistics
157-
a_min_max: possibly empty list of optimal arrangements"""
157+
a_min_max: possibly empty list of optimal arrangements."""
158158

159159
# Must be implemented by concrete subclass
160160
pass

src/lbaf/Execution/lbsInformAndTransferAlgorithm.py

+20-239
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22
import math
33
import random
44
from logging import Logger
5-
from typing import Union
6-
from itertools import accumulate
7-
from bisect import bisect
85

96
from .lbsAlgorithmBase import AlgorithmBase
107
from .lbsCriterionBase import CriterionBase
8+
from .lbsTransferStrategyBase import TransferStrategyBase
119
from ..Model.lbsObjectCommunicator import ObjectCommunicator
1210
from ..Model.lbsPhase import Phase
1311
from ..IO.lbsStatistics import print_function_statistics, inverse_transform_sample, min_Hamming_distance
@@ -17,7 +15,13 @@
1715
class InformAndTransferAlgorithm(AlgorithmBase):
1816
""" A concrete class for the 2-phase gossip+transfer algorithm."""
1917

20-
def __init__(self, work_model, parameters: dict, lgr: Logger, rank_qoi: str, object_qoi: str):
18+
def __init__(
19+
self,
20+
work_model,
21+
parameters: dict,
22+
lgr: Logger,
23+
rank_qoi: str,
24+
object_qoi: str):
2125
""" Class constructor
2226
work_model: a WorkModelBase instance
2327
parameters: a dictionary of parameters
@@ -49,24 +53,6 @@ def __init__(self, work_model, parameters: dict, lgr: Logger, rank_qoi: str, obj
4953
self._logger.info(
5054
f"Instantiated with {self.__n_iterations} iterations, {self.__n_rounds} rounds, fanout {self.__fanout}")
5155

52-
# Select object order strategy
53-
self.__strategy_mapped = {
54-
"arbitrary": self.arbitrary,
55-
"element_id": self.element_id,
56-
"decreasing_loads": self.decreasing_loads,
57-
"increasing_loads": self.increasing_loads,
58-
"increasing_connectivity": self.increasing_connectivity,
59-
"fewest_migrations": self.fewest_migrations,
60-
"small_objects": self.small_objects}
61-
o_s = parameters.get("order_strategy")
62-
if o_s not in self.__strategy_mapped:
63-
self._logger.error(f"{o_s} does not exist in known ordering strategies: "
64-
f"{[x for x in self.__strategy_mapped.keys()]}")
65-
sys.excepthook = exc_handler
66-
raise SystemExit(1)
67-
self.__order_strategy = self.__strategy_mapped[o_s]
68-
self._logger.info(f"Selected {self.__order_strategy.__name__} object ordering strategy")
69-
7056
# Try to instantiate object transfer criterion
7157
self.__transfer_criterion = CriterionBase.factory(
7258
parameters.get("criterion"),
@@ -77,9 +63,17 @@ def __init__(self, work_model, parameters: dict, lgr: Logger, rank_qoi: str, obj
7763
sys.excepthook = exc_handler
7864
raise SystemExit(1)
7965

80-
# Assign optional parameters
81-
self.__deterministic_transfer = parameters.get("deterministic_transfer", False)
82-
self.__max_objects_per_transfer = parameters.get("max_objects_per_transfer", math.inf)
66+
# Try to instantiate object transfer strategy
67+
strat_name = "Recursive"
68+
self.__transfer_strategy = TransferStrategyBase.factory(
69+
strat_name,
70+
parameters,
71+
self.__transfer_criterion,
72+
lgr=self._logger)
73+
if not self.__transfer_strategy:
74+
self._logger.error(f"Could not instantiate a transfer strategy of type {strat_name}")
75+
sys.excepthook = exc_handler
76+
raise SystemExit(1)
8377

8478
def information_stage(self):
8579
""" Execute information stage."""
@@ -144,134 +138,6 @@ def information_stage(self):
144138
if not p.get_load():
145139
continue
146140

147-
def recursive_extended_search(self, pick_list, object_list, c_fct, n_o, max_n_o):
148-
""" Recursively extend search to other objects."""
149-
150-
# Fail when no more objects available or maximum depth is reached
151-
if not pick_list or n_o >= max_n_o:
152-
return False
153-
154-
# Pick one object and move it from one list to the other
155-
o = random.choice(pick_list)
156-
pick_list.remove(o)
157-
object_list.append(o)
158-
n_o += 1
159-
160-
# Decide whether criterion allows for transfer
161-
if c_fct(object_list) < 0.:
162-
# Transfer is not possible, recurse further
163-
return self.recursive_extended_search(
164-
pick_list, object_list, c_fct, n_o, max_n_o)
165-
else:
166-
# Succeed when criterion is satisfied
167-
return True
168-
169-
def transfer_stage(self):
170-
""" Perform object transfer stage."""
171-
172-
# Initialize transfer stage
173-
self._logger.info("Executing transfer phase")
174-
n_ignored, n_transfers, n_rejects = 0, 0, 0
175-
176-
# Biggest transfer (num of object transferred at once)
177-
max_obj_transfers = 0
178-
179-
# Iterate over ranks
180-
for r_src in self._phase.get_ranks():
181-
# Skip workless ranks
182-
if not self._work_model.compute(r_src) > 0.:
183-
continue
184-
185-
# Skip ranks unaware of peers
186-
targets = r_src.get_known_loads()
187-
del targets[r_src]
188-
if not targets:
189-
n_ignored += 1
190-
continue
191-
self._logger.debug(f"Trying to offload from rank {r_src.get_id()} to {[p.get_id() for p in targets]}:")
192-
193-
# Offload objects for as long as necessary and possible
194-
srt_rank_obj = list(self.__order_strategy(
195-
r_src.get_migratable_objects(), r_src.get_id()))
196-
197-
while srt_rank_obj:
198-
# Pick next object in ordered list
199-
o = srt_rank_obj.pop()
200-
object_list = [o]
201-
self._logger.debug(f"* object {o.get_id()}:")
202-
203-
# Initialize destination information
204-
r_dst = None
205-
c_dst = -math.inf
206-
207-
# Use deterministic or probabilistic transfer method
208-
if self.__deterministic_transfer:
209-
# Select best destination with respect to criterion
210-
for r_try in targets.keys():
211-
c_try = self.__transfer_criterion.compute(
212-
[o], r_src, r_try)
213-
if c_try > c_dst:
214-
c_dst = c_try
215-
r_dst = r_try
216-
else:
217-
# Compute transfer CMF given information known to source
218-
p_cmf, c_values = r_src.compute_transfer_cmf(
219-
self.__transfer_criterion, o, targets, False)
220-
self._logger.debug(f"CMF = {p_cmf}")
221-
if not p_cmf:
222-
n_rejects += 1
223-
continue
224-
225-
# Pseudo-randomly select destination proc
226-
r_dst = inverse_transform_sample(p_cmf)
227-
c_dst = c_values[r_dst]
228-
229-
# Handle case where object not suitable for transfer
230-
if c_dst < 0.:
231-
# Give up if no objects left of no rank is feasible
232-
if not srt_rank_obj or not r_dst:
233-
n_rejects += 1
234-
continue
235-
236-
# Recursively extend search if possible
237-
pick_list = srt_rank_obj[:]
238-
success = self.recursive_extended_search(
239-
pick_list,
240-
object_list,
241-
lambda x: self.__transfer_criterion.compute(x, r_src, r_dst),
242-
1,
243-
self.__max_objects_per_transfer)
244-
if success:
245-
# Remove accepted objects from remaining object list
246-
srt_rank_obj = pick_list
247-
else:
248-
# No transferable list of objects was found
249-
n_rejects += 1
250-
continue
251-
252-
# Sanity check before transfer
253-
if r_dst not in r_src.get_known_loads():
254-
self._logger.error(
255-
f"Destination rank {r_dst.get_id()} not in known ranks")
256-
sys.excepthook = exc_handler
257-
raise SystemExit(1)
258-
259-
# Transfer objects
260-
if len(object_list) > max_obj_transfers:
261-
max_obj_transfers = len(object_list)
262-
263-
self._logger.debug(
264-
f"Transferring {len(object_list)} object(s) at once")
265-
for o in object_list:
266-
self._phase.transfer_object(o, r_src, r_dst)
267-
n_transfers += 1
268-
269-
self._logger.info(
270-
f"Maximum number of objects transferred at once: {max_obj_transfers}")
271-
272-
# Return object transfer counts
273-
return n_ignored, n_transfers, n_rejects
274-
275141
def execute(self, phases: list, distributions: dict, statistics: dict, a_min_max):
276142
""" Execute 2-phase gossip+transfer algorithm on Phase instance."""
277143

@@ -297,7 +163,7 @@ def execute(self, phases: list, distributions: dict, statistics: dict, a_min_max
297163
self.information_stage()
298164

299165
# Then execute transfer stage
300-
n_ignored, n_transfers, n_rejects = self.transfer_stage()
166+
n_ignored, n_transfers, n_rejects = self.__transfer_strategy.execute(self._phase)
301167
n_proposed = n_transfers + n_rejects
302168
if n_proposed:
303169
self._logger.info(
@@ -337,88 +203,3 @@ def execute(self, phases: list, distributions: dict, statistics: dict, a_min_max
337203
# Report final mapping in debug mode
338204
self.report_final_mapping(self._logger)
339205

340-
@staticmethod
341-
def arbitrary(objects: set, _):
342-
""" Default: objects are passed as they are stored."""
343-
344-
return objects
345-
346-
@staticmethod
347-
def element_id(objects: set, _):
348-
""" Order objects by ID."""
349-
350-
return sorted(objects, key=lambda x: x.get_id())
351-
352-
@staticmethod
353-
def decreasing_loads(objects: set, _):
354-
""" Order objects by decreasing object loads."""
355-
356-
return sorted(objects, key=lambda x: -x.get_load())
357-
358-
@staticmethod
359-
def increasing_loads(objects: set, _):
360-
""" Order objects by increasing object loads."""
361-
362-
return sorted(objects, key=lambda x: x.get_load())
363-
364-
@staticmethod
365-
def increasing_connectivity(objects: set, src_id):
366-
""" Order objects by increasing local communication volume."""
367-
368-
# Initialize list with all objects without a communicator
369-
no_comm = [
370-
o for o in objects
371-
if not isinstance(o.get_communicator(), ObjectCommunicator)]
372-
373-
# Order objects with a communicator
374-
with_comm = {}
375-
for o in objects:
376-
# Skip objects without a communicator
377-
comm = o.get_communicator()
378-
if not isinstance(o.get_communicator(), ObjectCommunicator):
379-
continue
380-
381-
# Update dict of objects with maximum local communication
382-
with_comm[o] = max(
383-
sum([v for k, v in comm.get_received().items()
384-
if k.get_rank_id() == src_id]),
385-
sum([v for k, v in comm.get_sent().items()
386-
if k.get_rank_id() == src_id]))
387-
388-
# Return list of objects order by increased local connectivity
389-
return no_comm + sorted(with_comm, key=with_comm.get)
390-
391-
@staticmethod
392-
def sorted_ascending(objects: Union[set, list]):
393-
return sorted(objects, key=lambda x: x.get_load())
394-
395-
@staticmethod
396-
def sorted_descending(objects: Union[set, list]):
397-
return sorted(objects, key=lambda x: -x.get_load())
398-
399-
def load_excess(self, objects: set):
400-
rank_load = sum([obj.get_load() for obj in objects])
401-
return rank_load - self.__average_load
402-
403-
def fewest_migrations(self, objects: set, _):
404-
""" First find the load of the smallest single object that, if migrated
405-
away, could bring this rank's load below the target load.
406-
Sort largest to the smallest if <= load_excess
407-
Sort smallest to the largest if > load_excess."""
408-
409-
load_excess = self.load_excess(objects)
410-
lt_load_excess = [obj for obj in objects if obj.get_load() <= load_excess]
411-
get_load_excess = [obj for obj in objects if obj.get_load() > load_excess]
412-
return self.sorted_descending(lt_load_excess) + self.sorted_ascending(get_load_excess)
413-
414-
def small_objects(self, objects: set, _):
415-
""" First find the smallest object that, if migrated away along with all
416-
smaller objects, could bring this rank's load below the target load.
417-
Sort largest to the smallest if <= load_excess
418-
Sort smallest to the largest if > load_excess."""
419-
420-
load_excess = self.load_excess(objects)
421-
sorted_objects = self.sorted_ascending(objects)
422-
accumulated_loads = list(accumulate(obj.get_load() for obj in sorted_objects))
423-
idx = bisect(accumulated_loads, load_excess) + 1
424-
return self.sorted_descending(sorted_objects[:idx]) + self.sorted_ascending(sorted_objects[idx:])

0 commit comments

Comments
 (0)