Skip to content

Commit

Permalink
🔒 Refactor and Stabilize /crypto Module for Production Readiness
Browse files Browse the repository at this point in the history
Summary:
This commit delivers a complete refactor and stabilization of the /crypto module, aligning it with Seigr's architecture, coding standards, and security best practices. The updates address critical issues, improve maintainability, and ensure a robust foundation for cryptographic operations.

Key Changes:

Key Derivation (key_derivation.py)

Refactored key derivation logic with clear separation of responsibilities.
Enhanced error handling with structured logs using ErrorLogEntry.
Standardized key encoding using senary and base64 utilities.
Improved resilience against invalid inputs and salt mismanagement.
Helpers (helpers.py)

Added robust senary encoding/decoding utilities.
Introduced salt application and validation utilities.
Implemented metadata generation and logging alerts.
Constants (constants.py)

Centralized constants for consistent reference across modules.
Ensured proper structure and naming alignment.
Logging and Error Handling

Structured error reporting using Protobuf's ErrorLogEntry.
Added descriptive logs for better traceability of operations.
Pylint Compliance:

Resolved all linting issues, including import errors and unused variables.
Achieved a 10.00/10 Pylint score for /crypto/key_derivation.py.
Updated .pylintrc in the user's home directory with path configurations.
Bug Fixes:

Addressed persistent E0401 import errors caused by PYTHONPATH misconfigurations.
Fixed unused imports and redundant variable assignments.
Improvements:

Adopted robust cryptographic best practices (e.g., PBKDF2-HMAC-SHA256).
Clear and structured logging for debugging and traceability.
Improved module-level documentation and function-level docstrings.
Testing:

Verified import paths and runtime execution with manual and automated testing.
Ensured compliance with Seigr's ecosystem standards.
Next Steps:

Perform end-to-end integration tests across /crypto.
Validate integration with dependent modules (e.g., /identity, /seigr_protocol).
  • Loading branch information
sergism77 committed Jan 3, 2025
1 parent b0b619e commit 23e29c2
Show file tree
Hide file tree
Showing 23 changed files with 2,361 additions and 1,295 deletions.
Empty file added src/__init__.py
Empty file.
238 changes: 118 additions & 120 deletions src/crypto/asymmetric_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,78 @@
import uuid
import time
from datetime import datetime, timezone
from cryptography.hazmat.primitives import hashes, serialization

from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.exceptions import InvalidSignature

from src.seigr_protocol.compiled.error_handling_pb2 import (
ErrorLogEntry,
ErrorSeverity,
ErrorResolutionStrategy,
)
from src.seigr_protocol.compiled.encryption_pb2 import AsymmetricKeyPair, SignatureLog
from src.seigr_protocol.compiled.alerting_pb2 import Alert, AlertType, AlertSeverity
from src.crypto.constants import (
SEIGR_CELL_ID_PREFIX,
DEFAULT_HASH_FUNCTION,
SUPPORTED_HASH_ALGORITHMS,
)
from src.crypto.encoding_utils import encode_to_senary
from src.crypto.hypha_crypt import HyphaCrypt
from src.crypto.key_management import generate_rsa_key_pair
from src.crypto.secure_logging import log_secure_action
from src.crypto.constants import SEIGR_CELL_ID_PREFIX

logger = logging.getLogger(__name__)

### Key Generation with Enhanced Error Reporting ###

### 🛡️ Alert Trigger for High-Severity Events ###

def _trigger_alert(
message: str, severity: AlertSeverity, recipient_id: str = None
) -> None:
"""
Trigger an alert for high-severity events.
Args:
message (str): Description of the alert.
severity (AlertSeverity): Severity level of the alert.
recipient_id (str, optional): ID of the affected recipient.
Returns:
None
"""
alert = Alert(
alert_id=f"{SEIGR_CELL_ID_PREFIX}_{uuid.uuid4()}",
message=message,
type=AlertType.ALERT_TYPE_SECURITY,
severity=severity,
timestamp=datetime.now(timezone.utc).isoformat(),
source_component="crypto_module",
affected_entity_id=recipient_id,
)
logger.warning(f"Alert triggered: {alert.message} with severity {severity}")


### 🗝️ Key Generation with Enhanced Error Reporting ###

def generate_key_pair(
key_size: int = 2048, retry_attempts: int = 3
key_size: int = 2048, retry_attempts: int = 3, retry_delay: int = 2
) -> AsymmetricKeyPair:
"""
Generate an RSA key pair with retry logic and structured error handling.
Args:
key_size (int): Size of the RSA key (default: 2048).
retry_attempts (int): Number of retries for key generation.
retry_delay (int): Delay between retries in seconds.
Returns:
AsymmetricKeyPair: Protobuf object containing the key pair.
Raises:
ValueError: If key generation fails after retries.
"""
for attempt in range(retry_attempts):
try:
private_key, public_key = generate_rsa_key_pair(key_size)
Expand Down Expand Up @@ -52,140 +101,74 @@ def generate_key_pair(
logger.warning(f"Key generation attempt {attempt + 1} failed: {str(e)}")
if attempt == retry_attempts - 1:
_trigger_alert(
"Key generation failed after retries",
f"Key generation failed after {retry_attempts} retries",
AlertSeverity.ALERT_SEVERITY_CRITICAL,
)
raise ValueError("Failed to generate RSA key pair after retries") from e
time.sleep(2**attempt) # Exponential backoff for retries

time.sleep(retry_delay * (2**attempt))

### Digital Signature ###

### 🔑 Key Serialization ###

def sign_data(
data: bytes,
private_key_pem: bytes,
use_senary: bool = True,
hash_algorithm=hashes.SHA256,
) -> SignatureLog:
private_key = load_private_key(private_key_pem)
try:
signature = private_key.sign(
data,
padding.PSS(
mgf=padding.MGF1(hash_algorithm()), salt_length=padding.PSS.MAX_LENGTH
),
hash_algorithm(),
)

data_hash = hashes.Hash(hash_algorithm())
data_hash.update(data)
signed_data_hash = data_hash.finalize()
signed_data_hash = (
encode_to_senary(signed_data_hash) if use_senary else signed_data_hash
)

signature_log = SignatureLog(
log_id=f"{SEIGR_CELL_ID_PREFIX}_{uuid.uuid4()}",
signer_id="signer_identifier",
signature=signature,
signing_algorithm="RSA-SHA256",
signed_data_hash=signed_data_hash,
timestamp=datetime.now(timezone.utc).isoformat(),
metadata={
"context": "sample_signing_operation",
"seigr_protocol": "active",
"data_type": "general",
},
)
log_secure_action(
"Data signed", {"log_id": signature_log.log_id, "algorithm": "RSA-SHA256"}
)
return signature_log
except Exception as e:
logger.error(f"Signing failed: {str(e)}")
_trigger_alert(
"Signing operation failed", AlertSeverity.ALERT_SEVERITY_CRITICAL
)
raise ValueError("Data signing failed") from e


### Key Verification ###


def verify_signature(data: bytes, signature: bytes, public_key_pem: bytes) -> bool:
public_key = load_public_key(public_key_pem)
try:
public_key.verify(
signature,
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256(),
)
log_secure_action(
"Signature verified", {"data": encode_to_senary(data[:10]), "result": True}
)
return True
except InvalidSignature:
log_secure_action(
"Signature verification failed",
{"data": encode_to_senary(data[:10]), "result": False},
)
return False


### Serialization Functions ###
def serialize_public_key(public_key) -> bytes:
"""
Serialize an RSA public key to PEM format.
Args:
public_key (rsa.RSAPublicKey): RSA public key object.
def serialize_public_key(public_key) -> bytes:
pem = public_key.public_bytes(
Returns:
bytes: PEM-encoded public key.
"""
return public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
log_secure_action("Public key serialized", {"protocol": "Seigr"})
return pem


def serialize_private_key(private_key, encryption_password: bytes = None) -> bytes:
"""
Serialize an RSA private key to PEM format.
Args:
private_key (rsa.RSAPrivateKey): RSA private key object.
encryption_password (bytes, optional): Password for encryption.
Returns:
bytes: PEM-encoded private key.
"""
encryption_algo = (
serialization.BestAvailableEncryption(encryption_password)
if encryption_password
else serialization.NoEncryption()
)
pem = private_key.private_bytes(
return private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=encryption_algo,
)
log_secure_action("Private key serialized", {"protocol": "Seigr"})
return pem


### Load and Error-Resilient Retry Logic ###
### 🔄 Key Loading with Retry Logic ###

def load_private_key(pem_data: bytes, password: bytes = None, retry_attempts: int = 2):
"""
Load an RSA private key from PEM data with retry logic.
def load_public_key(pem_data: bytes, retry_attempts: int = 2):
for attempt in range(retry_attempts):
try:
public_key = serialization.load_pem_public_key(pem_data)
log_secure_action("Public key loaded", {"protocol": "Seigr"})
return public_key
except Exception as e:
logger.warning(f"Attempt {attempt + 1} to load public key failed: {str(e)}")
if attempt == retry_attempts - 1:
_trigger_alert(
"Public key load failed", AlertSeverity.ALERT_SEVERITY_WARNING
)
raise ValueError("Failed to load RSA public key") from e
time.sleep(2**attempt) # Exponential backoff
Args:
pem_data (bytes): PEM-encoded private key.
password (bytes, optional): Password for the private key.
retry_attempts (int): Number of retry attempts.
Returns:
rsa.RSAPrivateKey: Loaded RSA private key object.
def load_private_key(pem_data: bytes, retry_attempts: int = 2):
Raises:
ValueError: If the private key fails to load after retries.
"""
for attempt in range(retry_attempts):
try:
private_key = serialization.load_pem_private_key(pem_data, password=None)
private_key = serialization.load_pem_private_key(pem_data, password=password)
log_secure_action("Private key loaded", {"protocol": "Seigr"})
return private_key
except Exception as e:
Expand All @@ -197,22 +180,37 @@ def load_private_key(pem_data: bytes, retry_attempts: int = 2):
"Private key load failed", AlertSeverity.ALERT_SEVERITY_WARNING
)
raise ValueError("Failed to load RSA private key") from e
time.sleep(2**attempt) # Exponential backoff
time.sleep(2**attempt)


### Alert Trigger for High-Severity Events ###
### ✍️ Digital Signature Using Seigr Hashing ###

def sign_data(data: bytes, private_key_pem: bytes, use_senary: bool = True) -> SignatureLog:
"""
Sign data with a private RSA key using hypha_hash.
def _trigger_alert(
message: str, severity: AlertSeverity, recipient_id: str = None
) -> None:
alert = Alert(
alert_id=f"{SEIGR_CELL_ID_PREFIX}_{uuid.uuid4()}",
message=message,
type=AlertType.ALERT_TYPE_SECURITY, # Specify the alert type as security
severity=severity,
Args:
data (bytes): Data to sign.
private_key_pem (bytes): PEM-encoded private key.
use_senary (bool): Whether to encode the hash in senary format.
Returns:
SignatureLog: Protobuf log containing signature metadata.
"""
private_key = load_private_key(private_key_pem)
signed_hash = HyphaCrypt.hypha_hash(data, algorithm=DEFAULT_HASH_FUNCTION)
signature = private_key.sign(
signed_hash.encode(),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH,
),
hashes.SHA256(),
)

return SignatureLog(
log_id=f"{SEIGR_CELL_ID_PREFIX}_{uuid.uuid4()}",
signature=signature,
signing_algorithm=DEFAULT_HASH_FUNCTION,
timestamp=datetime.now(timezone.utc).isoformat(),
source_component="crypto_module",
affected_entity_id=recipient_id,
)
logger.warning(f"Alert triggered: {alert.message} with severity {severity}")
Loading

0 comments on commit 23e29c2

Please sign in to comment.