diff --git a/src/crypto/cbor_utils.py b/src/crypto/cbor_utils.py index 36f2f11..07480a8 100644 --- a/src/crypto/cbor_utils.py +++ b/src/crypto/cbor_utils.py @@ -13,17 +13,19 @@ logger = logging.getLogger(__name__) -### ๐Ÿ›ก๏ธ Alert Trigger for Critical Issues ### +# ๐Ÿ›ก๏ธ Alert Trigger def _trigger_alert(message: str, severity: int) -> None: """ - Triggers an alert for critical failures. + Trigger an alert for critical failures. Args: - message (str): Description of the issue. - severity (int): The severity level of the alert. + message (str): Description of the alert. + severity (int): Severity level as defined in AlertSeverity enum. + + Raises: + None """ severity_enum = AlertSeverity.Name(severity) if severity in AlertSeverity.values() else "ALERT_SEVERITY_UNSPECIFIED" - alert = Alert( alert_id=f"{SEIGR_CELL_ID_PREFIX}_{uuid.uuid4()}", message=message, @@ -44,12 +46,25 @@ def _trigger_alert(message: str, severity: int) -> None: category="Alert", message=message, sensitive=False, - use_senary=False + use_senary=False, ) -### ๐Ÿ”„ Data Transformation ### +# ๐Ÿ”„ Data Transformation def transform_data(value, use_senary=False): + """ + Transform data recursively based on type. + + Args: + value: Data to be transformed. + use_senary (bool): Whether to encode/decode using senary encoding. + + Returns: + Transformed data based on type. + + Raises: + TypeError: If the data type is unsupported. + """ if isinstance(value, bytes): return encode_to_senary(value) if use_senary else value if isinstance(value, dict): @@ -60,82 +75,116 @@ def transform_data(value, use_senary=False): return decode_from_senary(value) if use_senary and is_senary(value) else value if isinstance(value, (int, float, bool)) or value is None: return value - raise TypeError(f"Unsupported data type: {type(value).__name__}") -### ๐Ÿ“ CBOR Encoding ### +# ๐Ÿ“ CBOR Encoding def encode_data(data, use_senary=False) -> EncryptedData: + """ + Encode data into CBOR format and wrap it in an EncryptedData object. + + Args: + data: The data to encode. + use_senary (bool): Whether to use senary encoding. + + Returns: + EncryptedData: Encoded data wrapped in EncryptedData object. + + Raises: + ValueError: If encoding fails. + """ try: transformed_data = transform_data(data, use_senary=use_senary) encoded = cbor2.dumps(transformed_data) - logger.debug("%s Data encoded to CBOR format successfully", SEIGR_CELL_ID_PREFIX) if _secure_logger_instance: _secure_logger_instance.log_audit_event( severity=AlertSeverity.ALERT_SEVERITY_INFO, category="Encode", message="Data successfully encoded to CBOR format", sensitive=False, - use_senary=use_senary + use_senary=use_senary, ) return EncryptedData(ciphertext=encoded) except Exception as e: - _trigger_alert("CBOR encoding error occurred", AlertSeverity.ALERT_SEVERITY_CRITICAL) + _trigger_alert(f"Encoding failed: {e}", AlertSeverity.ALERT_SEVERITY_CRITICAL) raise ValueError("CBOR encoding error occurred") from e -### ๐Ÿ› ๏ธ CBOR Decoding ### +# ๐Ÿ› ๏ธ CBOR Decoding def decode_data(encrypted_data: EncryptedData, use_senary=False): - if not encrypted_data or not hasattr(encrypted_data, 'ciphertext') or not encrypted_data.ciphertext: + """ + Decode CBOR data from an EncryptedData object. + + Args: + encrypted_data (EncryptedData): The encrypted data object. + use_senary (bool): Whether to use senary encoding. + + Returns: + Decoded and transformed data. + + Raises: + ValueError: If decoding fails or data is invalid. + """ + if not encrypted_data or not encrypted_data.ciphertext: _trigger_alert("Invalid EncryptedData object for decoding", AlertSeverity.ALERT_SEVERITY_CRITICAL) raise ValueError("Invalid EncryptedData object for decoding") - try: decoded = cbor2.loads(encrypted_data.ciphertext) + transformed = transform_data(decoded, use_senary=use_senary) if _secure_logger_instance: _secure_logger_instance.log_audit_event( severity=AlertSeverity.ALERT_SEVERITY_INFO, category="Decode", message="Data successfully decoded from CBOR format", sensitive=False, - use_senary=use_senary + use_senary=use_senary, ) - return decoded - except (cbor2.CBORDecodeEOF, cbor2.CBORDecodeError) as e: - _trigger_alert("CBOR decode error", AlertSeverity.ALERT_SEVERITY_CRITICAL) - raise ValueError("CBOR decode error") from e - except Exception as e: - _trigger_alert("Unexpected decoding failure", AlertSeverity.ALERT_SEVERITY_CRITICAL) + return transformed + except cbor2.CBORDecodeError as e: + _trigger_alert(f"CBOR decode error: {e}", AlertSeverity.ALERT_SEVERITY_CRITICAL) raise ValueError("CBOR decode error") from e -### ๐Ÿ’พ File Operations ### +# ๐Ÿ’พ Save to File def save_to_file(data, file_path, use_senary=False): + """ + Save data to a CBOR file. + + Args: + data: Data to be saved. + file_path (str): Path to save the file. + use_senary (bool): Whether to use senary encoding. + + Raises: + IOError: If saving fails. + """ try: encoded_data = encode_data(data, use_senary=use_senary) with open(file_path, "wb") as file: file.write(encoded_data.ciphertext) - logger.info("%s Data saved to file: %s", SEIGR_CELL_ID_PREFIX, file_path) - if _secure_logger_instance: - _secure_logger_instance.log_audit_event( - severity=AlertSeverity.ALERT_SEVERITY_INFO, - category="File Save", - message=f"Data saved to {file_path}", - sensitive=False, - use_senary=use_senary - ) except Exception as e: _trigger_alert(f"Failed to save data to file: {file_path}", AlertSeverity.ALERT_SEVERITY_CRITICAL) - raise Exception("File error") from e + raise IOError("Failed to save file") from e + +# ๐Ÿ’พ Load from File +def load_from_file(file_path: str): + """ + Load and decode data from a CBOR file. + + Args: + file_path (str): Path to load the file from. -def load_from_file(file_path, use_senary=False): + Returns: + Decoded data. + + Raises: + IOError: If loading fails. + """ try: with open(file_path, "rb") as file: - cbor_data = file.read() - encrypted_data = EncryptedData(ciphertext=cbor_data) - logger.info("%s Data loaded from file: %s", SEIGR_CELL_ID_PREFIX, file_path) - return decode_data(encrypted_data, use_senary=use_senary) + encrypted_data = EncryptedData(ciphertext=file.read()) + return decode_data(encrypted_data) except Exception as e: _trigger_alert(f"Failed to load data from file: {file_path}", AlertSeverity.ALERT_SEVERITY_CRITICAL) - raise Exception("File error") from e + raise IOError("Failed to load file") from e diff --git a/src/crypto/helpers.py b/src/crypto/helpers.py index 905da69..35c4b74 100644 --- a/src/crypto/helpers.py +++ b/src/crypto/helpers.py @@ -59,7 +59,7 @@ def encode_to_senary(binary_data: bytes, width: int = 2) -> str: """ try: senary_str = "".join(_base6_encode(byte).zfill(width) for byte in binary_data) - logger.debug(f"{SEIGR_CELL_ID_PREFIX} Encoded to senary: {senary_str}") + logger.debug(f"{SEIGR_CELL_ID_PREFIX} Senary encoding successful.") return senary_str except Exception as e: error_log = ErrorLogEntry( diff --git a/tests/crypto/test_cbor_utils.py b/tests/crypto/test_cbor_utils.py index 5012d1c..895d2b3 100644 --- a/tests/crypto/test_cbor_utils.py +++ b/tests/crypto/test_cbor_utils.py @@ -1,6 +1,5 @@ import pytest -import os -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, patch, call from src.crypto.cbor_utils import ( encode_data, decode_data, @@ -8,21 +7,38 @@ save_to_file, load_from_file, ) -from src.crypto.constants import SEIGR_CELL_ID_PREFIX from src.seigr_protocol.compiled.encryption_pb2 import EncryptedData +from src.crypto.constants import SEIGR_CELL_ID_PREFIX +from src.seigr_protocol.compiled.alerting_pb2 import AlertSeverity +from src.crypto.secure_logging import _secure_logger_instance + +# Initialize _secure_logger_instance if it's not already initialized +if _secure_logger_instance is None: + _secure_logger_instance = MagicMock() ### โœ… Fixture for Secure Logger Initialization ### @pytest.fixture(autouse=True) -def initialize_secure_logger(): - """Ensure the SecureLogger instance is initialized before each test.""" - from src.crypto.secure_logging import _initialize_secure_logger - _initialize_secure_logger() +def mock_secure_logger(mocker): + """ + Automatically mock the secure logger for every test. + """ + if not isinstance(_secure_logger_instance.log_audit_event, MagicMock): + mock_logger = mocker.patch.object( + _secure_logger_instance, + 'log_audit_event', + autospec=True + ) + else: + mock_logger = _secure_logger_instance.log_audit_event + return mock_logger ### ๐Ÿงช Test Data Transformation ### def test_transform_data(): - """Test transform_data function handles various data types correctly.""" + """ + Test transform_data handles various data types correctly. + """ assert transform_data(b"bytes") == b"bytes" assert transform_data({"key": "value"}) == {"key": "value"} assert transform_data([1, 2, 3]) == [1, 2, 3] @@ -36,7 +52,9 @@ def test_transform_data(): ### ๐Ÿงช Test CBOR Encoding ### def test_encode_data(): - """Test successful CBOR encoding.""" + """ + Test successful CBOR encoding. + """ data = {"key": "value"} result = encode_data(data) assert isinstance(result, EncryptedData) @@ -44,7 +62,9 @@ def test_encode_data(): def test_encode_data_failure(mocker): - """Test CBOR encoding failure.""" + """ + Test CBOR encoding failure. + """ mocker.patch('cbor2.dumps', side_effect=Exception("Mocked failure")) with pytest.raises(ValueError, match="CBOR encoding error occurred"): encode_data({"key": "value"}) @@ -52,7 +72,9 @@ def test_encode_data_failure(mocker): ### ๐Ÿงช Test CBOR Decoding ### def test_decode_data(): - """Test successful CBOR decoding.""" + """ + Test successful CBOR decoding. + """ data = {"key": "value"} encrypted = encode_data(data) decoded = decode_data(encrypted) @@ -60,64 +82,92 @@ def test_decode_data(): def test_decode_invalid_cbor_data(): - """Test decoding malformed CBOR data raises ValueError.""" + """ + Test decoding malformed CBOR data raises ValueError. + """ invalid_data = EncryptedData(ciphertext=b'\x9f\x9f\x00') with pytest.raises(ValueError, match="CBOR decode error"): decode_data(invalid_data) def test_decode_empty_ciphertext(): - """Test decoding empty ciphertext.""" + """ + Test decoding empty ciphertext raises ValueError. + """ empty_encrypted_data = EncryptedData(ciphertext=b'') - with pytest.raises(ValueError, match="CBOR decode error"): + with pytest.raises(ValueError, match="Invalid EncryptedData object for decoding"): decode_data(empty_encrypted_data) -### ๐Ÿงช Test Malicious Payload Decoding ### def test_decode_malicious_payload(): - """Test decoding a potentially malicious payload.""" - malicious_data = EncryptedData(ciphertext=b'\x00\x01\x02') + """ + Test decoding a malicious payload should raise ValueError. + """ + malicious_data = EncryptedData(ciphertext=b'\x9f\x9f\x00') with pytest.raises(ValueError, match="CBOR decode error"): decode_data(malicious_data) -### ๐Ÿงช Test Invalid EncryptedData Object ### def test_invalid_encrypted_data_object(): - """Test decoding invalid EncryptedData object.""" - with pytest.raises(ValueError, match="Invalid EncryptedData object for decoding."): + """ + Test decoding an invalid EncryptedData object raises ValueError. + """ + with pytest.raises(ValueError, match="Invalid EncryptedData object for decoding"): decode_data(None) ### ๐Ÿงช Test Secure Logging ### -def test_secure_logging_on_encode_decode(mocker): - from src.crypto.secure_logging import _secure_logger_instance - - mock_logger = mocker.patch.object( - _secure_logger_instance, 'log_audit_event', autospec=True - ) - +def test_secure_logging_on_encode_decode(mock_secure_logger): + """ + Test secure logging during encode and decode operations. + """ data = {"key": "value"} encrypted = encode_data(data) decode_data(encrypted) - assert mock_logger.call_count >= 2 - -def test_secure_logging_on_error(mocker): - from src.crypto.secure_logging import _secure_logger_instance - - mock_logger = mocker.patch.object( - _secure_logger_instance, 'log_audit_event', autospec=True - ) - + # Ensure log_audit_event was called twice: once on encode, once on decode + assert mock_secure_logger.call_count >= 2 + mock_secure_logger.assert_has_calls([ + call( + severity=AlertSeverity.ALERT_SEVERITY_INFO, + category="Encode", + message="Data successfully encoded to CBOR format", + sensitive=False, + use_senary=False + ), + call( + severity=AlertSeverity.ALERT_SEVERITY_INFO, + category="Decode", + message="Data successfully decoded from CBOR format", + sensitive=False, + use_senary=False + ) + ]) + + +def test_secure_logging_on_error(mock_secure_logger): + """ + Test secure logging during decode error scenarios. + """ invalid_data = EncryptedData(ciphertext=b'\x9f\x9f\x00') - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="CBOR decode error"): decode_data(invalid_data) - assert mock_logger.call_count > 0 + assert mock_secure_logger.call_count > 0 + mock_secure_logger.assert_any_call( + severity=AlertSeverity.ALERT_SEVERITY_CRITICAL, + category="Alert", + message="CBOR decode error", + sensitive=False, + use_senary=False + ) + ### ๐Ÿงช Test File Operations ### def test_save_and_load_from_file(tmp_path): - """Test saving to and loading from a CBOR file.""" + """ + Test saving and loading data from a CBOR file. + """ file_path = tmp_path / "test_file.cbor" data = {"key": "value"} @@ -128,35 +178,43 @@ def test_save_and_load_from_file(tmp_path): def test_save_to_file_failure(mocker, tmp_path): - """Test failure during file save.""" - mocker.patch('builtins.open', side_effect=Exception("File error")) + """ + Test failure during file save. + """ + mocker.patch('builtins.open', side_effect=IOError("Failed to save file")) file_path = tmp_path / "test_file.cbor" - with pytest.raises(Exception, match="File error"): + with pytest.raises(IOError, match="Failed to save file"): save_to_file({"key": "value"}, file_path) def test_load_from_file_failure(mocker, tmp_path): - """Test failure during file load.""" - mocker.patch('builtins.open', side_effect=Exception("File error")) + """ + Test failure during file load. + """ + mocker.patch('builtins.open', side_effect=IOError("Failed to load file")) file_path = tmp_path / "test_file.cbor" - with pytest.raises(Exception, match="File error"): + with pytest.raises(IOError, match="Failed to load file"): load_from_file(file_path) ### ๐Ÿงช Test Edge Cases ### def test_encode_empty_data(): - """Test encoding an empty dictionary.""" + """ + Test encoding and decoding an empty dictionary. + """ test_data = {} encoded = encode_data(test_data) - assert isinstance(encoded.ciphertext, bytes), "Encoded data should be in bytes format." + assert isinstance(encoded.ciphertext, bytes) decoded = decode_data(encoded) - assert decoded == test_data, "Decoded data should match the original empty dictionary." + assert decoded == test_data def test_encode_large_data(): - """Test encoding and decoding a large dataset.""" + """ + Test encoding and decoding a large dataset. + """ large_data = {f"key_{i}": i for i in range(10000)} encoded = encode_data(large_data)