Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add override_context global permission #5903

Merged
merged 1 commit into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/infrahub/core/constants/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class GlobalPermissions(InfrahubStringEnum):
MANAGE_ACCOUNTS = "manage_accounts"
MANAGE_PERMISSIONS = "manage_permissions"
MANAGE_REPOSITORIES = "manage_repositories"
OVERRIDE_CONTEXT = "override_context"


class PermissionAction(InfrahubStringEnum):
Expand Down
9 changes: 8 additions & 1 deletion backend/infrahub/graphql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from typing import TYPE_CHECKING

from infrahub.core.constants import InfrahubKind
from infrahub.core.constants import GlobalPermissions, InfrahubKind
from infrahub.core.manager import NodeManager
from infrahub.exceptions import NodeNotFoundError, ValidationError
from infrahub.permissions.globals import define_global_permission_from_branch

if TYPE_CHECKING:
from .initialization import GraphqlContext
Expand All @@ -16,6 +17,12 @@ async def apply_external_context(graphql_context: GraphqlContext, context_input:
if not context_input or not context_input.account:
return

permission = define_global_permission_from_branch(
permission=GlobalPermissions.OVERRIDE_CONTEXT, branch_name=graphql_context.branch.name
)

graphql_context.active_permissions.raise_for_permission(permission=permission)

try:
account = await NodeManager.get_one_by_id_or_default_filter(
db=graphql_context.db, id=str(context_input.account.id), kind=InfrahubKind.GENERICACCOUNT
Expand Down
15 changes: 15 additions & 0 deletions backend/infrahub/permissions/globals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from infrahub.core.account import GlobalPermission
from infrahub.core.constants import GLOBAL_BRANCH_NAME, GlobalPermissions, PermissionDecision
from infrahub.core.registry import registry


def define_global_permission_from_branch(permission: GlobalPermissions, branch_name: str) -> GlobalPermission:
if branch_name in (GLOBAL_BRANCH_NAME, registry.default_branch):
decision = PermissionDecision.ALLOW_DEFAULT
else:
decision = PermissionDecision.ALLOW_OTHER

return GlobalPermission(
action=permission.value,
decision=decision.value,
)
59 changes: 59 additions & 0 deletions backend/tests/helpers/permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from infrahub.core import registry
from infrahub.core.constants import InfrahubKind
from infrahub.core.node import Node
from infrahub.core.protocols import CoreAccountGroup
from infrahub.database import InfrahubDatabase
from infrahub.permissions import LocalPermissionBackend

if TYPE_CHECKING:
from infrahub.core.account import GlobalPermission, ObjectPermission
from infrahub.database import InfrahubDatabase


async def define_permissions(
account: Node,
db: InfrahubDatabase,
object_permissions: list[ObjectPermission] | None = None,
global_permissions: list[GlobalPermission] | None = None,
) -> None:
registry.permission_backends = [LocalPermissionBackend()]

object_permissions = object_permissions or []
global_permissions = global_permissions or []
permissions = []
for object_permission in object_permissions:
obj = await Node.init(db=db, schema=InfrahubKind.OBJECTPERMISSION)
await obj.new(
db=db,
namespace=object_permission.namespace,
name=object_permission.name,
action=object_permission.action,
decision=object_permission.decision,
)
await obj.save(db=db)
permissions.append(obj)

for global_permission in global_permissions:
obj = await Node.init(db=db, schema=InfrahubKind.GLOBALPERMISSION)
await obj.new(
db=db,
action=global_permission.action,
decision=global_permission.decision,
)
await obj.save(db=db)
permissions.append(obj)

role = await Node.init(db=db, schema=InfrahubKind.ACCOUNTROLE)
await role.new(db=db, name="chief-people-officer", permissions=permissions)
await role.save(db=db)

group = await Node.init(db=db, schema=CoreAccountGroup)
await group.new(db=db, name="hr", roles=[role])
await group.save(db=db)

await group.members.add(db=db, data={"id": account.id})
await group.members.save(db=db)
67 changes: 66 additions & 1 deletion backend/tests/unit/graphql/mutations/test_mutation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

from typing import TYPE_CHECKING

from infrahub.auth import AccountSession
from infrahub.core.account import GlobalPermission
from infrahub.core.branch import Branch
from infrahub.core.constants import GlobalPermissions, PermissionDecision
from infrahub.database import InfrahubDatabase
from infrahub.events.node_action import NodeMutatedEvent
from infrahub.graphql.initialization import prepare_graphql_params
from infrahub.services import InfrahubServices
from tests.adapters.event import MemoryInfrahubEvent
from tests.helpers.graphql import graphql
from tests.helpers.permissions import define_permissions

if TYPE_CHECKING:
from infrahub.auth import AccountSession
Expand All @@ -22,7 +26,19 @@ async def test_add_context_invalid_account(
default_branch: Branch,
car_person_schema: None,
first_account: Node,
session_first_account: AccountSession,
):
await define_permissions(
account=first_account,
db=db,
global_permissions=[
GlobalPermission(
action=GlobalPermissions.OVERRIDE_CONTEXT.value,
decision=PermissionDecision.ALLOW_ALL.value,
),
],
)

query = """
mutation {
TestPersonCreate(data: {name: { value: "John"}, height: {value: 182}}, context: { account: { id: "very-invalid" }}) {
Expand All @@ -33,7 +49,9 @@ async def test_add_context_invalid_account(
}
}
"""
gql_params = await prepare_graphql_params(db=db, include_subscription=False, branch=default_branch)
gql_params = await prepare_graphql_params(
db=db, include_subscription=False, branch=default_branch, account_session=session_first_account
)
result = await graphql(
schema=gql_params.schema,
source=query,
Expand All @@ -54,6 +72,17 @@ async def test_add_context_valid_account(
first_account: Node,
second_account: Node,
):
await define_permissions(
account=first_account,
db=db,
global_permissions=[
GlobalPermission(
action=GlobalPermissions.OVERRIDE_CONTEXT.value,
decision=PermissionDecision.ALLOW_ALL.value,
),
],
)

query = """
mutation {
TestPersonCreate(data: {name: { value: "John"}, height: {value: 182}}, context: { account: { id: "%s" }}) {
Expand Down Expand Up @@ -86,3 +115,39 @@ async def test_add_context_valid_account(
node_event = memory_event.events[0]
assert isinstance(node_event, NodeMutatedEvent)
assert node_event.meta.account_id == second_account.id


async def test_add_context_missing_permissions(
db: InfrahubDatabase,
default_branch: Branch,
car_person_schema: None,
session_second_account: AccountSession,
first_account: Node,
second_account: Node,
):
query = """
mutation {
TestPersonCreate(data: {name: { value: "John"}, height: {value: 182}}, context: { account: { id: "%s" }}) {
ok
object {
id
}
}
}
""" % (first_account.id)

gql_params = await prepare_graphql_params(
db=db,
include_subscription=False,
branch=default_branch,
account_session=session_second_account,
)
result = await graphql(
schema=gql_params.schema,
source=query,
context_value=gql_params.context,
root_value=None,
variable_values={},
)
assert result.errors
assert "You do not have the following permission: global:override_context:allow_default" in str(result.errors)
35 changes: 4 additions & 31 deletions backend/tests/unit/graphql/test_mutation_relationship.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from infrahub.services import InfrahubServices
from tests.adapters.event import MemoryInfrahubEvent
from tests.helpers.graphql import graphql
from tests.helpers.permissions import define_permissions

if TYPE_CHECKING:
from infrahub.core.branch import Branch
Expand All @@ -41,7 +42,7 @@ async def test_relationship_add(
session_first_account: AccountSession,
first_account: Node,
):
await _define_permissions(
await define_permissions(
account=first_account,
db=db,
object_permissions=[
Expand Down Expand Up @@ -391,7 +392,7 @@ async def test_relationship_groups_add(
session_first_account: AccountSession,
first_account: Node,
):
await _define_permissions(
await define_permissions(
account=first_account,
db=db,
object_permissions=[
Expand Down Expand Up @@ -538,7 +539,7 @@ async def test_relationship_groups_remove(
session_first_account: AccountSession,
first_account: Node,
):
await _define_permissions(
await define_permissions(
account=first_account,
db=db,
object_permissions=[
Expand Down Expand Up @@ -1074,31 +1075,3 @@ async def test_without_permissions(

assert result.errors
assert "You do not have one of the following permissions" in result.errors[0].message


async def _define_permissions(account: Node, db: InfrahubDatabase, object_permissions: list[ObjectPermission]) -> None:
registry.permission_backends = [LocalPermissionBackend()]

permissions = []
for object_permission in object_permissions:
obj = await Node.init(db=db, schema=InfrahubKind.OBJECTPERMISSION)
await obj.new(
db=db,
namespace=object_permission.namespace,
name=object_permission.name,
action=object_permission.action,
decision=object_permission.decision,
)
await obj.save(db=db)
permissions.append(obj)

role = await Node.init(db=db, schema=InfrahubKind.ACCOUNTROLE)
await role.new(db=db, name="chief-people-officer", permissions=permissions)
await role.save(db=db)

group = await Node.init(db=db, schema=InfrahubKind.ACCOUNTGROUP)
await group.new(db=db, name="hr", roles=[role])
await group.save(db=db)

await group.members.add(db=db, data={"id": account.id})
await group.members.save(db=db)
Loading