Skip to content

Commit

Permalink
Merge branch 'stable' into pmc-20250114-docs-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
petercrocker authored Jan 14, 2025
2 parents 208fbd4 + dd1cc18 commit b4d2be6
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 31 deletions.
14 changes: 14 additions & 0 deletions backend/infrahub/core/diff/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
from infrahub.core.query.diff import DiffAllPathsQuery
from infrahub.core.timestamp import Timestamp
from infrahub.database import InfrahubDatabase
from infrahub.log import get_logger

from .model.path import CalculatedDiffs, NodeFieldSpecifier

log = get_logger()


class DiffCalculator:
def __init__(self, db: InfrahubDatabase) -> None:
Expand Down Expand Up @@ -41,9 +44,13 @@ async def calculate_diff(
diff_from=from_time,
diff_to=to_time,
)
log.info("Beginning diff calculation query for branch")
await branch_diff_query.execute(db=self.db)
log.info("Diff calculation query for branch complete")
log.info("Reading results of query for branch")
for query_result in branch_diff_query.get_results():
diff_parser.read_result(query_result=query_result)
log.info("Results of query for branch read")

if base_branch.name != diff_branch.name:
new_node_field_specifiers = diff_parser.get_new_node_field_specifiers()
Expand All @@ -61,10 +68,17 @@ async def calculate_diff(
],
new_node_field_specifiers=[(nfs.node_uuid, nfs.field_name) for nfs in new_node_field_specifiers],
)

log.info("Beginning diff calculation query for base")
await base_diff_query.execute(db=self.db)
log.info("Diff calculation query for base complete")
log.info("Reading results of query for base")
for query_result in base_diff_query.get_results():
diff_parser.read_result(query_result=query_result)
log.info("Results of query for branch read")
log.info("Parsing calculated diff")
diff_parser.parse(include_unchanged=include_unchanged)
log.info("Calculated diff parsed")
return CalculatedDiffs(
base_branch_name=base_branch.name,
diff_branch_name=diff_branch.name,
Expand Down
12 changes: 11 additions & 1 deletion backend/infrahub/core/diff/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,9 @@ async def _aggregate_enriched_diffs(
else:
end_time = diff_request.to_time
# if there are no changes on either branch in this time range, then there cannot be a diff
log.info(f"Checking number of changes on branches for {diff_request!r}")
log.info(
f"Checking number of changes on branches for {diff_request!r}, from_time={current_time}, to_time={end_time}"
)
num_changes_by_branch = await self.diff_repo.get_num_changes_in_time_range_by_branch(
branch_names=[diff_request.base_branch.name, diff_request.diff_branch.name],
from_time=current_time,
Expand Down Expand Up @@ -520,13 +522,17 @@ async def _concatenate_diffs_and_requests(
previous_diff_pair = single_enriched_diffs
continue

log.info("Combining diffs...")
previous_diff_pair = await self._combine_diffs(earlier=previous_diff_pair, later=single_enriched_diffs)
log.info("Diffs combined.")

return previous_diff_pair

async def _combine_diffs(
self, earlier: EnrichedDiffs | EnrichedDiffsMetadata, later: EnrichedDiffs | EnrichedDiffsMetadata
) -> EnrichedDiffs | EnrichedDiffsMetadata:
log.info(f"Earlier diff to combine: {earlier!r}")
log.info(f"Later diff to combine: {later!r}")
# if one of the diffs is hydrated and has no data, we can combine them without hydrating the other
if isinstance(earlier, EnrichedDiffs) and earlier.is_empty:
later.base_branch_diff.from_time = earlier.base_branch_diff.from_time
Expand All @@ -539,9 +545,13 @@ async def _combine_diffs(

# hydrate the diffs to combine, if necessary
if not isinstance(earlier, EnrichedDiffs):
log.info("Hydrating earlier diff...")
earlier = await self.diff_repo.hydrate_diff_pair(enriched_diffs_metadata=earlier)
log.info("Earlier diff hydrated.")
if not isinstance(later, EnrichedDiffs):
log.info("Hydrating later diff...")
later = await self.diff_repo.hydrate_diff_pair(enriched_diffs_metadata=later)
log.info("Later diff hydrated.")

return await self.diff_combiner.combine(earlier_diffs=earlier, later_diffs=later)

Expand Down
10 changes: 10 additions & 0 deletions backend/infrahub/core/diff/merger/merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from infrahub.core import registry
from infrahub.core.diff.model.path import BranchTrackingId
from infrahub.core.diff.query.merge import DiffMergePropertiesQuery, DiffMergeQuery, DiffMergeRollbackQuery
from infrahub.log import get_logger

if TYPE_CHECKING:
from infrahub.core.branch import Branch
Expand All @@ -14,6 +15,8 @@

from .serializer import DiffMergeSerializer

log = get_logger()


class DiffMerger:
def __init__(
Expand Down Expand Up @@ -41,10 +44,14 @@ async def merge_graph(self, at: Timestamp) -> None:
latest_diff = diff
if latest_diff is None:
raise RuntimeError(f"Missing diff for branch {self.source_branch.name}")
log.info(f"Retrieving diff {latest_diff.uuid}")
enriched_diff = await self.diff_repository.get_one(
diff_branch_name=self.source_branch.name, diff_id=latest_diff.uuid
)
log.info(f"Diff {latest_diff.uuid} retrieved")
batch_num = 0
async for node_diff_dicts, property_diff_dicts in self.serializer.serialize_diff(diff=enriched_diff):
log.info(f"Merging batch of nodes #{batch_num}")
merge_query = await DiffMergeQuery.init(
db=self.db,
branch=self.source_branch,
Expand All @@ -53,6 +60,7 @@ async def merge_graph(self, at: Timestamp) -> None:
node_diff_dicts=node_diff_dicts,
)
await merge_query.execute(db=self.db)
log.info(f"Merging batch of properties #{batch_num}")
merge_properties_query = await DiffMergePropertiesQuery.init(
db=self.db,
branch=self.source_branch,
Expand All @@ -61,6 +69,8 @@ async def merge_graph(self, at: Timestamp) -> None:
property_diff_dicts=property_diff_dicts,
)
await merge_properties_query.execute(db=self.db)
log.info(f"Batch #{batch_num} merged")
batch_num += 1

self.source_branch.branched_from = at.to_string()
await self.source_branch.save(db=self.db)
Expand Down
30 changes: 3 additions & 27 deletions backend/infrahub/core/diff/merger/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@ class DiffMergeSerializer:
def __init__(self, db: InfrahubDatabase, max_batch_size: int) -> None:
self.db = db
self.max_batch_size = max_batch_size
self._relationship_id_cache: dict[tuple[str, str], str] = {}
self._attribute_type_cache: dict[tuple[str, str], type] = {}
self._source_branch_name: str | None = None
self._target_branch_name: str | None = None
# {(node_id, relationship_id, peer_id)}
self._conflicted_cardinality_one_relationships: set[tuple[str, str, str]] = set()

def _reset_caches(self) -> None:
self._relationship_id_cache = {}
self._attribute_type_cache = {}

@property
Expand Down Expand Up @@ -70,20 +68,6 @@ def _get_action(self, action: DiffAction, conflict: EnrichedDiffConflict | None)
def _to_action_str(self, action: DiffAction) -> str:
return str(action.value).upper()

def _get_relationship_identifier(self, schema_kind: str, relationship_name: str) -> str:
cache_key = (schema_kind, relationship_name)
if cache_key in self._relationship_id_cache:
return self._relationship_id_cache[cache_key]
try:
node_schema = self._get_schema(kind=schema_kind, branch_name=self.source_branch_name)
relationship_schema = node_schema.get_relationship(name=relationship_name)
except (SchemaNotFoundError, ValueError):
node_schema = self._get_schema(kind=schema_kind, branch_name=self.target_branch_name)
relationship_schema = node_schema.get_relationship(name=relationship_name)
relationship_identifier = relationship_schema.get_identifier()
self._relationship_id_cache[cache_key] = relationship_identifier
return relationship_identifier

def _get_property_type_for_attribute_value(self, schema_kind: str, attribute_name: str) -> type:
cache_key = (schema_kind, attribute_name)
if cache_key in self._attribute_type_cache:
Expand Down Expand Up @@ -132,16 +116,13 @@ def _cache_conflicted_cardinality_one_relationships(self, diff: EnrichedDiffRoot
for prop in element.properties:
if prop.property_type is not DatabaseEdgeType.IS_RELATED:
continue
relationship_identifier = self._get_relationship_identifier(
schema_kind=node.kind, relationship_name=rel.name
)
if prop.previous_value:
self._conflicted_cardinality_one_relationships.add(
(node.uuid, relationship_identifier, prop.previous_value)
(node.uuid, rel.identifier, prop.previous_value)
)
if prop.new_value:
self._conflicted_cardinality_one_relationships.add(
(node.uuid, relationship_identifier, prop.new_value)
(node.uuid, rel.identifier, prop.new_value)
)

async def serialize_diff(
Expand Down Expand Up @@ -181,13 +162,10 @@ async def serialize_diff(
serialized_property_diffs.append(attribute_property_diff)
relationship_diffs: list[RelationshipMergeDict] = []
for rel_diff in node.relationships:
relationship_identifier = self._get_relationship_identifier(
schema_kind=node.kind, relationship_name=rel_diff.name
)
for relationship_element_diff in rel_diff.relationships:
element_diffs, relationship_property_diffs = self._serialize_relationship_element(
relationship_diff=relationship_element_diff,
relationship_identifier=relationship_identifier,
relationship_identifier=rel_diff.identifier,
node_uuid=node.uuid,
)
relationship_diffs.extend(element_diffs)
Expand Down Expand Up @@ -251,8 +229,6 @@ def _serialize_attribute(
# we only delete attributes when the whole attribute is deleted
if action is DiffAction.REMOVED and attribute_diff.action is not DiffAction.REMOVED:
continue
# action = DiffAction.ADDED
# value = "NULL"
prop_dicts.append(
PropertyMergeDict(
property_type=property_diff.property_type.value,
Expand Down
4 changes: 3 additions & 1 deletion backend/infrahub/core/diff/model/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,9 @@ def __repr__(self) -> str:
f"branch_name={self.diff_branch_name},"
f"base_name={self.base_branch_name},"
f"from_time={self.diff_branch_diff.from_time},"
f"to_time={self.diff_branch_diff.to_time})"
f"to_time={self.diff_branch_diff.to_time},"
f"num_base_nodes={len(self.base_branch_diff.nodes)},"
f"num_branch_nodes={len(self.diff_branch_diff.nodes)}"
)

@classmethod
Expand Down
5 changes: 3 additions & 2 deletions backend/infrahub/core/diff/repository/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ def _get_node_create_request_batch(

@retry_db_transaction(name="enriched_diff_save")
async def save(self, enriched_diffs: EnrichedDiffs) -> None:
log.info("Saving diff...")
num_nodes = len(enriched_diffs.base_branch_diff.nodes) + len(enriched_diffs.diff_branch_diff.nodes)
log.info(f"Saving diff (num_nodes={num_nodes})...")
root_query = await EnrichedDiffRootsCreateQuery.init(db=self.db, enriched_diffs=enriched_diffs)
await root_query.execute(db=self.db)
for node_create_batch in self._get_node_create_request_batch(enriched_diffs=enriched_diffs):
Expand Down Expand Up @@ -339,7 +340,7 @@ async def get_num_changes_in_time_range_by_branch(
return query.get_num_changes_by_branch()

async def get_node_field_specifiers(self, diff_id: str) -> set[NodeFieldSpecifier]:
limit = 5000
limit = config.SETTINGS.database.query_size_limit
offset = 0
specifiers: set[NodeFieldSpecifier] = set()
while True:
Expand Down
5 changes: 5 additions & 0 deletions backend/infrahub/core/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from infrahub.core.registry import registry
from infrahub.core.timestamp import Timestamp
from infrahub.exceptions import ValidationError
from infrahub.log import get_logger

from ..git.models import GitRepositoryMerge
from ..workflows.catalogue import GIT_REPOSITORIES_MERGE
Expand All @@ -25,6 +26,8 @@
from infrahub.database import InfrahubDatabase
from infrahub.services import InfrahubServices

log = get_logger()


class BranchMerger:
def __init__(
Expand Down Expand Up @@ -174,9 +177,11 @@ async def merge(
if self.source_branch.name == registry.default_branch:
raise ValidationError(f"Unable to merge the branch '{self.source_branch.name}' into itself")

log.debug("Updating diff for merge")
enriched_diff = await self.diff_coordinator.update_branch_diff_and_return(
base_branch=self.destination_branch, diff_branch=self.source_branch
)
log.debug("Diff updated for merge")
conflict_map = enriched_diff.get_all_conflicts()
errors: list[str] = []
for conflict_path, conflict in conflict_map.items():
Expand Down
4 changes: 4 additions & 0 deletions backend/tests/unit/core/diff/test_diff_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def deleted_person_node_diff(self, person_node_main, car_node_main) -> EnrichedD
)
deleted_relationship = EnrichedRelationshipGroupFactory.build(
name="cars",
identifier="testcar__testperson",
relationships={deleted_relationship_element},
action=DiffAction.UPDATED,
)
Expand Down Expand Up @@ -300,6 +301,7 @@ def added_person_node_diff(self, person_node_branch, car_node_branch) -> Enriche
)
added_relationship = EnrichedRelationshipGroupFactory.build(
name="cars",
identifier="testcar__testperson",
relationships={added_relationship_element},
action=DiffAction.UPDATED,
)
Expand Down Expand Up @@ -549,6 +551,7 @@ def updated_person_node_diff(
)
deleted_relationship = EnrichedRelationshipGroupFactory.build(
name="cars",
identifier="testcar__testperson",
relationships={deleted_relationship_element},
action=DiffAction.UPDATED,
)
Expand Down Expand Up @@ -600,6 +603,7 @@ def updated_car_diff(self, person_node_main, car_node_main, person_node_main2, c
)
updated_relationship = EnrichedRelationshipGroupFactory.build(
name="owner",
identifier="testcar__testperson",
relationships={updated_relationship_element},
action=DiffAction.UPDATED,
)
Expand Down

0 comments on commit b4d2be6

Please sign in to comment.