From bcea9ab71c242fa1f5c309075a5231a2116af09c Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Fri, 10 Jan 2025 15:29:41 +0000 Subject: [PATCH 1/5] Create poi discrepancy notification forwarder --- .github/workflows/test.yml | 57 ++++ .gitignore | 65 +++- compose/.env.example | 8 + compose/dev.dependencies.yml | 40 +++ services/poi_monitor/Dockerfile | 19 ++ services/poi_monitor/README.md | 204 ++++++++++++ .../001_create_notifications_table.py | 49 +++ .../migrations/003_add_poi_set_column.sql | 12 + services/poi_monitor/requirements-dev.txt | 3 + services/poi_monitor/requirements.txt | 6 + services/poi_monitor/setup.py | 15 + services/poi_monitor/src/__init__.py | 21 ++ services/poi_monitor/src/analyzer.py | 291 ++++++++++++++++++ services/poi_monitor/src/database.py | 194 ++++++++++++ services/poi_monitor/src/migration.py | 82 +++++ services/poi_monitor/src/monitor.py | 64 ++++ services/poi_monitor/src/notification.py | 89 ++++++ 17 files changed, 1213 insertions(+), 6 deletions(-) create mode 100644 .github/workflows/test.yml create mode 100644 compose/.env.example create mode 100644 compose/dev.dependencies.yml create mode 100644 services/poi_monitor/Dockerfile create mode 100644 services/poi_monitor/README.md create mode 100644 services/poi_monitor/migrations/001_create_notifications_table.py create mode 100644 services/poi_monitor/migrations/003_add_poi_set_column.sql create mode 100644 services/poi_monitor/requirements-dev.txt create mode 100644 services/poi_monitor/requirements.txt create mode 100644 services/poi_monitor/setup.py create mode 100644 services/poi_monitor/src/__init__.py create mode 100644 services/poi_monitor/src/analyzer.py create mode 100644 services/poi_monitor/src/database.py create mode 100644 services/poi_monitor/src/migration.py create mode 100644 services/poi_monitor/src/monitor.py create mode 100644 services/poi_monitor/src/notification.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..09f5e7b --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,57 @@ +name: Tests + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-latest + + services: + postgres: + image: postgres + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + POSTGRES_DB: graphix + ports: + - 5433:5433 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + cd services/poi_monitor + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-dev.txt + + - name: Run tests + run: | + cd services/poi_monitor + pytest tests/ --cov=src --cov-report=xml + env: + POSTGRES_HOST: localhost + POSTGRES_PORT: 5433 + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + POSTGRES_DB: graphix + + - name: Upload coverage report + uses: codecov/codecov-action@v3 + with: + file: ./services/poi_monitor/coverage.xml \ No newline at end of file diff --git a/.gitignore b/.gitignore index f9ffaa5..59ecb0b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,64 @@ -rustc-ice* +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST +venv/ +.venv/ +ENV/ -### IDEs -.vscode/settings.json +# IDEs .idea/ +.vscode/ +*.swp +*.swo + +# Environment variables +.env + +# Coverage +.coverage +coverage.xml +htmlcov/ + +# Logs +*.log + +# Database +*.sqlite3 -### OS +# Rust +/target/ + +# OS specific .DS_Store +Thumbs.db + +# Debug files +rustc-ice* + +# Docker and Compose +compose/grafana/data/ +compose/data/ +compose/prometheus/data/ +*.db -### Rust -/target +# Environment variables +compose/*.env diff --git a/compose/.env.example b/compose/.env.example new file mode 100644 index 0000000..b9da60b --- /dev/null +++ b/compose/.env.example @@ -0,0 +1,8 @@ +GRAPHIX_DB_URL=postgresql://postgres:password@localhost:5433/graphix +POSTGRES_DB=graphix +POSTGRES_USER=postgres +POSTGRES_PASSWORD=password +POSTGRES_PORT=5433 +CHECK_INTERVAL=300 +GRAPHIX_API_URL=http://host.docker.internal:8000/graphql +SLACK_WEBHOOK_URL=your_webhook_url_here diff --git a/compose/dev.dependencies.yml b/compose/dev.dependencies.yml new file mode 100644 index 0000000..675213e --- /dev/null +++ b/compose/dev.dependencies.yml @@ -0,0 +1,40 @@ +version: "3" + +services: + postgres: + image: postgres + restart: unless-stopped + ports: + - "${POSTGRES_PORT:-5433}:${POSTGRES_PORT:-5433}" + environment: + POSTGRES_USER: ${POSTGRES_USER:-postgres} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-password} + POSTGRES_DB: ${POSTGRES_DB:-graphix} + PGDATA: "/var/lib/postgresql/data" + command: -p ${POSTGRES_PORT:-5433} + healthcheck: + test: ["CMD-SHELL", "pg_isready -h localhost -p ${POSTGRES_PORT:-5433} -U ${POSTGRES_USER:-postgres}"] + interval: 1s + timeout: 1s + retries: 1000 + volumes: + - ./data/postgres:/var/lib/postgresql/data + + poi-monitor: + build: + context: ../services/poi_monitor + dockerfile: Dockerfile + environment: + - POSTGRES_DB=${POSTGRES_DB:-graphix} + - POSTGRES_USER=${POSTGRES_USER:-postgres} + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-password} + - POSTGRES_HOST=postgres + - POSTGRES_PORT=${POSTGRES_PORT:-5433} + - GRAPHIX_API_URL=${GRAPHIX_API_URL:-http://host.docker.internal:8000/graphql} + - SLACK_WEBHOOK_URL=${SLACK_WEBHOOK_URL} + - CHECK_INTERVAL=${CHECK_INTERVAL:-300} + depends_on: + postgres: + condition: service_healthy + extra_hosts: + - "host.docker.internal:host-gateway" \ No newline at end of file diff --git a/services/poi_monitor/Dockerfile b/services/poi_monitor/Dockerfile new file mode 100644 index 0000000..6ca21ca --- /dev/null +++ b/services/poi_monitor/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install dependencies +COPY requirements.txt . +COPY requirements-dev.txt . +RUN pip install --no-cache-dir -r requirements.txt +RUN if [ "${ENVIRONMENT}" = "development" ]; then pip install --no-cache-dir -r requirements-dev.txt; fi + +# Copy application code +COPY src/ src/ +COPY migrations/ migrations/ + +# Set environment variables +ENV PYTHONUNBUFFERED=1 + +# Run the monitor +CMD ["python", "-m", "src.monitor"] diff --git a/services/poi_monitor/README.md b/services/poi_monitor/README.md new file mode 100644 index 0000000..85c1bf4 --- /dev/null +++ b/services/poi_monitor/README.md @@ -0,0 +1,204 @@ +# POI Monitor Service + +## Description + +A service that monitors Proof of Indexing (POI) submissions across indexers in The Graph network, +detecting discrepancies and POI reuse and forwarding notifications to Slack when an issue is detected. + +## Features + +- Monitors POI submissions in real-time +- Detects when indexers submit different POIs for the same deployment/block +- Identifies POI reuse across different deployments/blocks +- Sends notifications to Slack when issues are found +- Prevents duplicate notifications for known issues +- Uses connection pooling for efficient database access +- Handles service restarts gracefully + +## Setup + +### Prerequisites + +- Python 3.12+ +- PostgreSQL +- Docker (for containerized deployment) +- Slack webhook URL + +### Environment Variables + +Create a `.env` file with: +``` +POSTGRES_DB=graphix +POSTGRES_USER=postgres +POSTGRES_PASSWORD=password +POSTGRES_HOST=localhost +POSTGRES_PORT=5433 +GRAPHIX_API_URL=http://localhost:8000/graphql +SLACK_WEBHOOK_URL=your_webhook_url_here # Required to run the service for notifications in Slack +CHECK_INTERVAL=300 # seconds between checks +``` + +### Local Development + +1. Create a virtual environment: +```bash +# Create and activate virtual environment +python -m venv venv +# For Git Bash on Windows: +source venv/Scripts/activate +# For Linux/Mac: +source venv/bin/activate +``` + +2. Install dependencies: +```bash +# With virtual environment activated: +pip install -r services/poi_monitor/requirements.txt +pip install -r services/poi_monitor/requirements-dev.txt # For development/testing +# Install package in editable mode (for development tools like pytest, mypy, etc.) +pip install -e services/poi_monitor +``` + +## Running the Service + +### Using Docker Compose + +The POI monitor requires several steps to run: + +```bash +# 1. Start postgres database +docker-compose -f compose/dev.dependencies.yml up -d postgres +# pgAdmin 4 should now be able to connect to the database + +# 2. Before we start the graphix service, we need to set the database URL: +export GRAPHIX_DB_URL=postgresql://postgres:password@localhost:5433/graphix + +# 3. Build and start the graphix service +cargo build +./target/debug/graphix --database-url postgresql://postgres:password@localhost:5433/graphix +# The graphix service should now be accessible at http://localhost:8000/graphql + +# 4. In a separate Git Bash terminal, build and start the POI monitor: +docker-compose -f compose/dev.dependencies.yml build poi-monitor # Build the POI monitor service +docker-compose -f compose/dev.dependencies.yml up poi-monitor # Start the POI monitor service +``` + +### Running Tests + +```bash +pytest tests/ --cov=src --cov-report=term-missing +``` + +### Project Structure +``` +graphix/ # Root project directory +├── Dockerfile # Main graphix service Dockerfile +├── compose/ +│ ├── dependencies.yml # Base services configuration +│ ├── dev.dependencies.yml # POI monitor service configuration +│ └── .env # Environment variables for Docker +└── services/ + └── poi_monitor/ # POI Monitor service + ├── Dockerfile # POI monitor service Dockerfile + ├── src/ # Source code + ├── tests/ # Test files + ├── migrations/ # Database migrations + └── .env # Local environment variables +``` + +## Restarting the Service + +Note: After making code changes, rebuild and restart the service: +```bash +docker-compose -f compose/dev.dependencies.yml build poi-monitor +docker-compose -f compose/dev.dependencies.yml up poi-monitor +``` + +For restarting without code changes (e.g., after updating environment variables): +```bash +docker-compose -f compose/dev.dependencies.yml restart poi-monitor +``` + +## Notifications + +When a POI discrepancy is detected, a Slack message is sent with: +- Deployment CID and block number +- Different POI hashes submitted +- Which indexers submitted each POI +- Any instances of POI reuse from other deployments + +Example notification: +``` +🚨 *POI Discrepancy Detected* +*Deployment:* QmYyB6sr2366Vw2mcWBXy2pTwqJKNkqyZnmxPeQJGHyXav +*Block:* 1234567 + +*Submitted POIs:* +• Hash: 0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef +*Submitted by:* 0x1234567890abcdef1234567890abcdef12345678 + +• Hash: 0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890 +Previously used 5 days ago: +• Network: mainnet +• Deployment: QmXyz789abcdef1234567890abcdef1234567890abcdef1234567890abcd +• Block: 1234000 +• Indexer: 0xabcdef1234567890abcdef1234567890abcdef1234 +``` + +### Technical Implementation +The POI monitor service compares POIs at the deployment level, where POIs for each deployment are only compared +when they share the same block height. This means we can effectively compare POIs across different networks, even +when their chain heads are at vastly different heights (e.g., Arbitrum at block 15M vs Ethereum at block 5M). +What matters is that for any given deployment, all POIs being compared were generated at the same block height +for that specific deployment. + +1. Deployment-Scoped Comparisons: + ```python + def analyze_pois(self, deployment_id: str, block_number: int): + # POIs are only compared within the same [deployment, block-height] combination + ``` + +2. Independent Block Heights: + - Each deployment is processed independently + - Block numbers are compared within the same deployment + - Different networks (e.g., Arbitrum at block 15M vs Ethereum at block 5M) + are handled correctly because their deployments are different + +3. POI Reuse Detection: + - Checks for POI reuse by comparing POI hashes across all deployments + - Does not explicitly differentiate between networks + - Deployment IDs are naturally network-specific because they include: + - The network configuration in the manifest + - Network-specific data sources + - Other chain-specific parameters + +### Limitations + +The POI monitor service focuses on the most recent POI submissions and has some limitations regarding historical data: + +- The service processes POIs at the latest block heights it finds for each deployment. +- If multiple discrepancies exist across different block heights, only recent ones will be detected +- Therefore historical discrepancies may be missed if they occurred during service downtime + +For example: +- The service was live, running during blocks 0 to 220,000,000 +- The servce goes offline for some large number of blocks. e.g. 100,000,000 blocks +- If POI discrepancies may exist at blocks: + - 230,000,000 + - 240,000,000 + - 250,000,000 + ... + - 310,000,000 + - 320,000,000 +- The service will only detect and notify about discrepancies at the latest blocks (e.g., 310M, 320M). +- Historical discrepancies at blocks 230M, 240M, 250M, etc. will not be detected + +This behavior is acceptable for normal operation because: +1. Epochs typically last 24 hours +2. The service checks for discrepancies every few minutes (configurable via CHECK_INTERVAL) +3. Under normal operation, no discrepancies should be missed + +However, developers should be aware that: +- If the service experiences downtime +- And POI discrepancies occur during that downtime +- Those historical discrepancies might not be detected when service resumes diff --git a/services/poi_monitor/migrations/001_create_notifications_table.py b/services/poi_monitor/migrations/001_create_notifications_table.py new file mode 100644 index 0000000..edbf886 --- /dev/null +++ b/services/poi_monitor/migrations/001_create_notifications_table.py @@ -0,0 +1,49 @@ +import logging +from typing import Optional +import psycopg2 + +logger = logging.getLogger(__name__) + +def migrate_up(conn: Optional[psycopg2.extensions.connection] = None) -> None: + """Create the poi_notifications table.""" + + up_sql = """ + CREATE TABLE IF NOT EXISTS poi_notifications ( + id SERIAL PRIMARY KEY, + deployment_id TEXT NOT NULL, + block_number BIGINT NOT NULL, + message TEXT NOT NULL, + sent_at TIMESTAMP NOT NULL DEFAULT NOW(), + UNIQUE(deployment_id, block_number) + ); + + CREATE INDEX IF NOT EXISTS idx_poi_notifications_sent_at + ON poi_notifications(sent_at); + """ + + try: + with conn.cursor() as cur: + cur.execute(up_sql) + conn.commit() + logger.info("Successfully created poi_notifications table") + except Exception as e: + conn.rollback() + logger.error(f"Failed to create poi_notifications table: {str(e)}") + raise + +def migrate_down(conn: Optional[psycopg2.extensions.connection] = None) -> None: + """Remove the poi_notifications table.""" + + down_sql = """ + DROP TABLE IF EXISTS poi_notifications; + """ + + try: + with conn.cursor() as cur: + cur.execute(down_sql) + conn.commit() + logger.info("Successfully dropped poi_notifications table") + except Exception as e: + conn.rollback() + logger.error(f"Failed to drop poi_notifications table: {str(e)}") + raise \ No newline at end of file diff --git a/services/poi_monitor/migrations/003_add_poi_set_column.sql b/services/poi_monitor/migrations/003_add_poi_set_column.sql new file mode 100644 index 0000000..0a50365 --- /dev/null +++ b/services/poi_monitor/migrations/003_add_poi_set_column.sql @@ -0,0 +1,12 @@ +-- Add poi_set column +ALTER TABLE poi_notifications +ADD COLUMN IF NOT EXISTS poi_set BYTEA[]; + +-- Update existing rows with empty array +UPDATE poi_notifications +SET poi_set = ARRAY[]::bytea[] +WHERE poi_set IS NULL; + +-- Make column not null after filling defaults +ALTER TABLE poi_notifications +ALTER COLUMN poi_set SET NOT NULL; \ No newline at end of file diff --git a/services/poi_monitor/requirements-dev.txt b/services/poi_monitor/requirements-dev.txt new file mode 100644 index 0000000..6070917 --- /dev/null +++ b/services/poi_monitor/requirements-dev.txt @@ -0,0 +1,3 @@ +pytest==8.0.0 +pytest-cov==4.1.0 +pytest-mock==3.12.0 \ No newline at end of file diff --git a/services/poi_monitor/requirements.txt b/services/poi_monitor/requirements.txt new file mode 100644 index 0000000..759d70b --- /dev/null +++ b/services/poi_monitor/requirements.txt @@ -0,0 +1,6 @@ +requests==2.31.0 +python-dotenv==1.0.0 +psycopg2-binary==2.9.9 +prometheus-client==0.19.0 +pyyaml==6.0.1 +python-json-logger==2.0.7 diff --git a/services/poi_monitor/setup.py b/services/poi_monitor/setup.py new file mode 100644 index 0000000..b69e48f --- /dev/null +++ b/services/poi_monitor/setup.py @@ -0,0 +1,15 @@ +from setuptools import setup, find_packages + +setup( + name="poi_monitor", + version="0.1", + packages=find_packages(), + install_requires=[ + "requests", + "python-dotenv", + "psycopg2-binary", + "prometheus-client", + "pyyaml", + "python-json-logger", + ], +) \ No newline at end of file diff --git a/services/poi_monitor/src/__init__.py b/services/poi_monitor/src/__init__.py new file mode 100644 index 0000000..c7d9a6d --- /dev/null +++ b/services/poi_monitor/src/__init__.py @@ -0,0 +1,21 @@ +""" +POI Monitor Service + +A service that monitors Proof of Indexing (POI) submissions across indexers in The Graph network, +detecting discrepancies and POI reuse. + +Components: +- analyzer: POI discrepancy detection +- database: PostgreSQL connection management +- notification: Slack integration +- monitor: Main service loop +- migration: Database schema management +""" + +from .analyzer import PoiAnalyzer +from .database import Database +from .notification import SlackNotifier +from .monitor import main + +__version__ = "0.1.0" +__all__ = ['PoiAnalyzer', 'Database', 'SlackNotifier', 'main'] diff --git a/services/poi_monitor/src/analyzer.py b/services/poi_monitor/src/analyzer.py new file mode 100644 index 0000000..38819eb --- /dev/null +++ b/services/poi_monitor/src/analyzer.py @@ -0,0 +1,291 @@ +# Import external libraries +import logging +from typing import Dict, Set, List, Optional, Tuple +from datetime import datetime, timedelta +import os +import requests + +# Import internal modules +from .database import Database +from .notification import SlackNotifier + +# Configure logging +logger = logging.getLogger(__name__) + +class PoiAnalyzer: + """Class to analyze POI submissions and notify slack if discrepancies are found.""" + + def __init__(self, database: Database, notifier: SlackNotifier): + """Initialize the POI analyzer with a database and notifier. + These are internal modules that are injected into the class.""" + self.db = database + self.notifier = notifier + self.page_size = 100 # Default page size for pagination + + def analyze_pois(self, deployment_id: str, block_number: int) -> Optional[Dict]: + """Analyze POI submissions and detect discrepancies between indexers. + + This method checks if different indexers have submitted different POI (Proof of Indexing) + hashes for the same deployment and block. A discrepancy indicates that indexers + disagree about the correct POI value. + + Params: + deployment_id: The deployment CID to analyze + block_number: The blockchain block number to analyze + + Returns: + Optional[Dict]: Returns None if: + - Notification was already sent + - No POI submissions exist + - Only one unique POI hash exists (no discrepancy) + + Returns a dictionary containing discrepancy details if multiple different + POI hashes are found, with structure: + { + 'deployment_cid': str, + 'block_number': int, + 'submissions': Dict[str, Set[str]], # POI hash -> set of indexer addresses + 'reuse_info': Dict[str, List[str]] # POI reuse history + } + + Raises: + Exception: If there's an error accessing the database or processing submissions + """ + try: + # Skip if we've already notified about this deployment/block + if self.db.check_notification_sent(deployment_id, block_number): + logger.debug(f"Already notified about {deployment_id} at block {block_number}") + return None + + # Get all POI submissions for this deployment/block + poi_submissions = self.db.get_latest_pois(deployment_id, block_number) + + if not poi_submissions: + logger.debug(f"No POI submissions found for {deployment_id} at block {block_number}") + return None + + # If there's only one POI hash, there's no discrepancy + if len(poi_submissions) == 1: + logger.debug(f"All indexers agree on POI for {deployment_id} at block {block_number}") + return None + + # We have a discrepancy - format the data + discrepancy_data = { + 'deployment_cid': deployment_id, + 'block_number': block_number, + 'submissions': poi_submissions, + 'reuse_info': self._check_poi_reuse(poi_submissions) + } + + return discrepancy_data + + except Exception as e: + logger.error(f"Error analyzing POIs: {str(e)}", exc_info=True) + raise + + def _check_poi_reuse(self, submissions: Dict[str, Set[str]]) -> Dict[str, List[str]]: + """Check if any POIs have been reused from other blocks/deployments. + + Params: + submissions: Dictionary mapping POI hashes to sets of indexer addresses + + Returns: + Dict mapping POI hashes to lists of reuse information + """ + reuse_info = {} + + query = """ + SELECT + p.poi, + d.ipfs_cid as deployment_id, + b.number as block_number, + i.address as indexer_address, + n.name as network_name, + p.created_at as submission_time + FROM pois p + JOIN sg_deployments d ON d.id = p.sg_deployment_id + JOIN blocks b ON b.id = p.block_id + JOIN indexers i ON i.id = p.indexer_id + JOIN networks n ON n.id = d.network + WHERE p.poi = ANY(%s) + ORDER BY p.created_at DESC + """ + + try: + with self.db.get_connection() as conn: + with conn.cursor() as cur: + poi_hashes = list(submissions.keys()) + cur.execute(query, (poi_hashes,)) + results = cur.fetchall() + + # Group results by POI hash + poi_occurrences = {} + for row in results: + (poi_hash, deployment_id, block_number, + indexer_addr, network, timestamp) = row + if poi_hash not in poi_occurrences: + poi_occurrences[poi_hash] = [] + poi_occurrences[poi_hash].append({ + 'deployment_id': deployment_id, + 'block_number': block_number, + 'indexer_address': indexer_addr, + 'network': network, + 'timestamp': timestamp + }) + + # Format detailed reuse information + for poi_hash, occurrences in poi_occurrences.items(): + if len(occurrences) > 1: # POI appears more than once + reuse_info[poi_hash] = [] + + # Sort by timestamp descending + occurrences.sort(key=lambda x: x['timestamp'], reverse=True) + + # First occurrence is current + current = occurrences[0] + + # Add details for each previous use + for prev in occurrences[1:]: + time_diff = current['timestamp'] - prev['timestamp'] + days_ago = time_diff.days + + reuse_info[poi_hash].append( + f"Previously used {days_ago} days ago:\n" + f"• Network: {prev['network']}\n" + f"• Deployment: {prev['deployment_id']}\n" + f"• Block: {prev['block_number']}\n" + f"• Indexer: {prev['indexer_address']}" + ) + + return reuse_info + + except Exception as e: + logger.error(f"Error checking POI reuse: {str(e)}", exc_info=True) + return {} + + def process_new_submissions(self) -> None: + """Process any new POI submissions and send notifications for discrepancies.""" + try: + recent_submissions = self._get_recent_submissions() + + for deployment_id, block_number in recent_submissions: + try: + discrepancy = self.analyze_pois(deployment_id, block_number) + + if discrepancy: + # Format and send notification + message = self.notifier.format_poi_discrepancy_message(discrepancy) + if self.notifier.send_notification(message): + # Record that we sent the notification + self.db.record_notification( + deployment_id, + block_number, + message + ) + except Exception as e: + # Log the error but continue processing other submissions + logger.error(f"Error processing submission for deployment {deployment_id} at block {block_number}: {str(e)}", exc_info=True) + continue + + # Cleanup old POI notifications from the database + self.db.cleanup_old_notifications(days=60) + + except Exception as e: + logger.error(f"Error processing submissions: {str(e)}", exc_info=True) + raise + + def _get_recent_submissions(self) -> List[tuple[str, int]]: + """Get list of recent deployment/block combinations to check.""" + graphql_url = os.getenv('GRAPHIX_API_URL', 'http://localhost:8000/graphql') + submissions = set() + + query = """ + query { + poiAgreementRatios( + indexerAddress: "%s" + ) { + poi { + hash + block { + number + } + deployment { + cid + } + indexer { + address + } + } + } + } + """ + + try: + indexers = self._get_indexers() + if not indexers: + logger.error("No indexers found") + return [] + + for indexer_address in indexers: + logger.debug(f"Fetching POIs for indexer {indexer_address}") + current_query = query % indexer_address + + response = requests.post( + graphql_url, + json={'query': current_query}, + timeout=10 + ) + response.raise_for_status() + data = response.json() + + if 'errors' in data: + logger.error(f"GraphQL errors: {data['errors']}") + break + + if 'data' not in data or 'poiAgreementRatios' not in data['data']: + logger.error("Unexpected GraphQL response format") + break + + # Extract POIs from current page + agreements = data['data']['poiAgreementRatios'] + for agreement in agreements: + submissions.add(( + agreement['poi']['deployment']['cid'], + agreement['poi']['block']['number'] + )) + + return list(submissions) + + except requests.exceptions.RequestException as e: + logger.error(f"Failed to fetch recent submissions: {str(e)}") + return [] + + def _get_indexers(self) -> List[str]: + """Get list of indexer addresses.""" + query = """ + query { + indexers( + limit: %d + ) { + address + } + } + """ + try: + all_indexers = [] + current_query = query % self.page_size + logger.debug(f"Fetching indexers with limit {self.page_size}") + response = requests.post( + os.getenv('GRAPHIX_API_URL'), + json={'query': current_query}, + timeout=10 + ) + data = response.json() + if 'data' in data and 'indexers' in data['data']: + indexers = data['data']['indexers'] + all_indexers.extend([indexer['address'] for indexer in indexers]) + return all_indexers + return [] + except Exception as e: + logger.error(f"Error getting indexers: {str(e)}") + return [] diff --git a/services/poi_monitor/src/database.py b/services/poi_monitor/src/database.py new file mode 100644 index 0000000..14fe130 --- /dev/null +++ b/services/poi_monitor/src/database.py @@ -0,0 +1,194 @@ +import os +import logging +from typing import Dict, Set +import psycopg2 +from psycopg2.pool import SimpleConnectionPool +from contextlib import contextmanager +from dotenv import load_dotenv + +logger = logging.getLogger(__name__) +load_dotenv() + +class Database: + """ + This class is the Database manager for POI monitoring. + + This class manages database connections and provides methods for: + - Tracking POI submissions across indexers + - Managing slack notification history + - Handling database migrations (schema updates) + """ + + def __init__(self): + """ + Constructor initializes database connection and runs migrations. + + Raises: + psycopg2.Error: If database connection cannot be established + Exception: If migrations fail to apply + """ + # Create the connection pool first + self.pool = SimpleConnectionPool( + minconn=1, + maxconn=10, + dbname=os.getenv('POSTGRES_DB', 'graphix'), + user=os.getenv('POSTGRES_USER', 'postgres'), + password=os.getenv('POSTGRES_PASSWORD', 'password'), + host=os.getenv('POSTGRES_HOST', 'localhost'), + port=os.getenv('POSTGRES_PORT', '5433') + ) + + # Get initial connection for migrations + with self.get_connection() as conn: + self.conn = conn # Store temporary reference for migrations + self._run_migrations() + self.conn = None # Remove reference after migrations + + @contextmanager + def get_connection(self): + """ + Get a database connection from the pool with automatic cleanup (this is faster than creating a + new connetion each time we want to talk to the db) + + - Automatically returns connection to pool after use + - Handles cleanup even if an exception occurs + """ + conn = self.pool.getconn() + try: + yield conn + finally: + self.pool.putconn(conn) + + def get_latest_pois(self, deployment_id: str, block_number: int) -> Dict[str, Set[str]]: + """Fetch all indexer POI submissions for a specific deployment and block. + + Retrieves all POI submissions and the indexers that submitted them for + a given deployment at a specific block number. + + Params: + deployment_id: The IPFS CID of the subgraph deployment + block_number: The block number to verify POIs against + + Returns: + A dictionary mapping POI hashes to sets of indexer addresses. + Example: + { + "0xabc...": {"0x123...", "0x456..."}, + "0xdef...": {"0x789..."} + } + """ + query = """ + SELECT p.poi, i.address + FROM pois p + JOIN indexers i ON i.id = p.indexer_id + JOIN blocks b ON b.id = p.block_id + JOIN sg_deployments d ON d.id = p.sg_deployment_id + WHERE d.ipfs_cid = %s AND b.number = %s + """ + + with self.get_connection() as conn: + with conn.cursor() as cur: + cur.execute(query, (deployment_id, block_number)) + results = cur.fetchall() + + poi_submissions = {} + for poi_hash, indexer_addr in results: + if poi_hash not in poi_submissions: + poi_submissions[poi_hash] = set() + poi_submissions[poi_hash].add(indexer_addr) + + return poi_submissions + + def check_notification_sent(self, deployment_id: str, block_number: int) -> bool: + """Check if we've already notified about the current set of POIs for this deployment/block. + + Params: + deployment_id: The deployment CID + block_number: The block number + + Returns: + bool: True if we've already notified about these exact POIs + """ + query = """ + WITH current_pois AS ( + SELECT array_agg(DISTINCT p.poi ORDER BY p.poi) as poi_set + FROM pois p + JOIN blocks b ON b.id = p.block_id + JOIN sg_deployments d ON d.id = p.sg_deployment_id + WHERE d.ipfs_cid = %s AND b.number = %s + ) + SELECT EXISTS ( + SELECT 1 + FROM poi_notifications n + CROSS JOIN current_pois c + WHERE n.deployment_id = %s + AND n.block_number = %s + AND n.poi_set = c.poi_set + ) + """ + + with self.get_connection() as conn: + with conn.cursor() as cur: + cur.execute(query, (deployment_id, block_number, deployment_id, block_number)) + return cur.fetchone()[0] + + def record_notification(self, deployment_id: str, block_number: int, message: str) -> None: + """Record that a notification was sent. Later used to prevent duplicate notifications/spam. + + Params: + deployment_id: The deployment IPFS hash + block_number: The block number + message: The notification message that was sent + """ + + query = """ + WITH current_pois AS ( + SELECT array_agg(DISTINCT p.poi ORDER BY p.poi) as poi_set + FROM pois p + JOIN blocks b ON b.id = p.block_id + JOIN sg_deployments d ON d.id = p.sg_deployment_id + WHERE d.ipfs_cid = %s AND b.number = %s + ) + INSERT INTO poi_notifications (deployment_id, block_number, message, sent_at, poi_set) + SELECT %s, %s, %s, NOW(), c.poi_set::bytea[] + FROM current_pois c + """ + + with self.get_connection() as conn: + with conn.cursor() as cur: + cur.execute(query, (deployment_id, block_number, deployment_id, block_number, message)) + conn.commit() + + def cleanup_old_notifications(self, days: int = 60) -> None: + """Remove notification records older than specified days.""" + query = """ + DELETE FROM poi_notifications + WHERE sent_at < NOW() - INTERVAL '%s days' + """ + + with self.get_connection() as conn: + with conn.cursor() as cur: + cur.execute(query, (days,)) + conn.commit() + + def _run_migrations(self): + """Run any pending database migrations.""" + from .migration import MigrationManager + + try: + manager = MigrationManager(self.conn) + manager.apply_migrations() + + # Verify table structure + with self.conn.cursor() as cur: + cur.execute(""" + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = 'poi_notifications' + """) + columns = cur.fetchall() + logger.info(f"poi_notifications table structure: {columns}") + + except Exception as e: + logger.error(f"Failed to run migrations: {str(e)}") + raise diff --git a/services/poi_monitor/src/migration.py b/services/poi_monitor/src/migration.py new file mode 100644 index 0000000..b588fad --- /dev/null +++ b/services/poi_monitor/src/migration.py @@ -0,0 +1,82 @@ +import os +import logging +import importlib +from typing import List +import psycopg2 + +logger = logging.getLogger(__name__) + +class MigrationManager: + def __init__(self, conn: psycopg2.extensions.connection): + self.conn = conn + self._ensure_migration_table() + + def _ensure_migration_table(self): + """Create the migrations tracking table if it doesn't exist.""" + sql = """ + CREATE TABLE IF NOT EXISTS poi_monitor_migrations ( + id SERIAL PRIMARY KEY, + migration_name TEXT NOT NULL UNIQUE, + applied_at TIMESTAMP NOT NULL DEFAULT NOW() + ); + """ + with self.conn.cursor() as cur: + cur.execute(sql) + self.conn.commit() + + def get_applied_migrations(self) -> List[str]: + """Get list of already applied migrations.""" + sql = "SELECT migration_name FROM poi_monitor_migrations ORDER BY id;" + with self.conn.cursor() as cur: + cur.execute(sql) + return [row[0] for row in cur.fetchall()] + + def apply_migrations(self): + """Apply all pending migrations.""" + applied = set(self.get_applied_migrations()) + + # Get all migration files + migrations_dir = os.path.join(os.path.dirname(__file__), '..', 'migrations') + logger.info(f"Looking for migrations in: {migrations_dir}") + migration_files = sorted([ + f for f in os.listdir(migrations_dir) + if (f.endswith('.py') and f != '__init__.py') or f.endswith('.sql') + ]) + logger.info(f"Found migration files: {migration_files}") + + for migration_file in migration_files: + migration_name = migration_file[:-3] + + if migration_name in applied: + logger.info(f"Skipping already applied migration: {migration_name}") + continue + + logger.info(f"Applying migration: {migration_name}") + + try: + if migration_file.endswith('.sql'): + # Handle SQL files + with open(os.path.join(migrations_dir, migration_file)) as f: + sql = f.read() + with self.conn.cursor() as cur: + cur.execute(sql) + self.conn.commit() + else: + # Import and run Python migrations + module = importlib.import_module(f"migrations.{migration_name}") + module.migrate_up(self.conn) + + # Record the migration + with self.conn.cursor() as cur: + cur.execute( + "INSERT INTO poi_monitor_migrations (migration_name) VALUES (%s)", + (migration_name,) + ) + self.conn.commit() + + logger.info(f"Successfully applied migration: {migration_name}") + + except Exception as e: + self.conn.rollback() + logger.error(f"Failed to apply migration {migration_name}: {str(e)}") + raise \ No newline at end of file diff --git a/services/poi_monitor/src/monitor.py b/services/poi_monitor/src/monitor.py new file mode 100644 index 0000000..bc11c4f --- /dev/null +++ b/services/poi_monitor/src/monitor.py @@ -0,0 +1,64 @@ +import os +import time +import logging +from logging.config import dictConfig +from dotenv import load_dotenv +from .database import Database +from .notification import SlackNotifier +from .analyzer import PoiAnalyzer + +# Load environment variables +load_dotenv() + +# Configure logging +logging_config = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'json': { + '()': 'pythonjsonlogger.jsonlogger.JsonFormatter', + 'format': '%(asctime)s %(levelname)s %(name)s %(message)s' + } + }, + 'handlers': { + 'console': { + 'class': 'logging.StreamHandler', + 'formatter': 'json' + } + }, + 'root': { + 'handlers': ['console'], + 'level': 'INFO' + } +} + +dictConfig(logging_config) +logger = logging.getLogger(__name__) + +def main(): + logger.info("Starting POI Monitor service...") + + # Initialize components + try: + db = Database() + notifier = SlackNotifier() + analyzer = PoiAnalyzer(db, notifier) + logger.info("Successfully initialized all components") + except Exception as e: + logger.error(f"Failed to initialize components: {str(e)}") + raise + + try: + while True: + logger.info("Running POI check iteration") + analyzer.process_new_submissions() + time.sleep(int(os.getenv('CHECK_INTERVAL', 300))) # Default 5 minutes + + except KeyboardInterrupt: + logger.info("Shutting down POI Monitor service...") + except Exception as e: + logger.error(f"Unexpected error: {str(e)}", exc_info=True) + raise + +if __name__ == "__main__": + main() diff --git a/services/poi_monitor/src/notification.py b/services/poi_monitor/src/notification.py new file mode 100644 index 0000000..eb8f0a5 --- /dev/null +++ b/services/poi_monitor/src/notification.py @@ -0,0 +1,89 @@ +import os +import logging +import requests +from typing import Dict, Any + +logger = logging.getLogger(__name__) + +class SlackNotifier: + """This class is used to send notifications to a Slack channel.""" + def __init__(self, webhook_url: str = None): + """ + Initializes the SlackNotifier with a webhook URL. + + Params: + webhook_url: The Slack webhook URL + + Raises: + ValueError: If the Slack webhook URL is not provided + """ + self.webhook_url = webhook_url or os.getenv('SLACK_WEBHOOK_URL') + if not self.webhook_url: + raise ValueError("Slack webhook URL not provided") + + def send_notification(self, message: str) -> bool: + """Send a notification to Slack. + + Params: + message: The formatted message to send + + Returns: + bool: True if the message was sent successfully + """ + try: + response = requests.post( + self.webhook_url, + json={"text": message}, + timeout=10 + ) + response.raise_for_status() + logger.info("Successfully sent Slack notification") + return True + + except requests.exceptions.RequestException as e: + logger.error(f"Failed to send Slack notification: {str(e)}") + return False + + def format_poi_discrepancy_message(self, data: Dict[str, Any]) -> str: + """Format POI discrepancy data into a Slack message. + + Params: + data: Dictionary containing POI discrepancy information + + Returns: + str: Formatted message ready to send to Slack + """ + message_parts = [ + "🚨 *New POI Discrepancy Found*", + f"*Deployment:* `{data['deployment_cid']}`", + f"*Block:* `{data['block_number']}`", + "*POI Submissions:*" + ] + + for poi_hash, indexers in data['submissions'].items(): + # Convert memoryview to hex string if needed + if isinstance(poi_hash, memoryview): + poi_hash = poi_hash.hex() + + submission_parts = [f"*POI Hash:* `{poi_hash}`"] + + # Add POI reuse information if available + if 'reuse_info' in data and poi_hash in data['reuse_info']: + reuse_data = data['reuse_info'][poi_hash] + submission_parts.append("⚠️ *POI Reuse:*") + for detail in reuse_data: + submission_parts.append(f" • {detail}") + + # Convert indexer addresses if they're memoryview + formatted_indexers = [] + for indexer in indexers: + if isinstance(indexer, memoryview): + formatted_indexers.append(indexer.hex()) + else: + formatted_indexers.append(str(indexer)) + + submission_parts.append(f"*Submitted by:* `{', '.join(sorted(formatted_indexers))}`") + message_parts.extend(submission_parts) + message_parts.append("") # Add spacing between submissions + + return "\n".join(message_parts) From a2b5e9710aaf8d0481cd0bae452573ee56304cec Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Fri, 10 Jan 2025 15:32:06 +0000 Subject: [PATCH 2/5] Create tests --- services/poi_monitor/README.md | 3 +- services/poi_monitor/tests/test_analyzer.py | 174 ++++++++++++++++++ services/poi_monitor/tests/test_database.py | 114 ++++++++++++ services/poi_monitor/tests/test_migration.py | 38 ++++ services/poi_monitor/tests/test_monitor.py | 69 +++++++ .../poi_monitor/tests/test_notification.py | 49 +++++ 6 files changed, 446 insertions(+), 1 deletion(-) create mode 100644 services/poi_monitor/tests/test_analyzer.py create mode 100644 services/poi_monitor/tests/test_database.py create mode 100644 services/poi_monitor/tests/test_migration.py create mode 100644 services/poi_monitor/tests/test_monitor.py create mode 100644 services/poi_monitor/tests/test_notification.py diff --git a/services/poi_monitor/README.md b/services/poi_monitor/README.md index 85c1bf4..3a6d1e4 100644 --- a/services/poi_monitor/README.md +++ b/services/poi_monitor/README.md @@ -69,6 +69,7 @@ The POI monitor requires several steps to run: # 1. Start postgres database docker-compose -f compose/dev.dependencies.yml up -d postgres # pgAdmin 4 should now be able to connect to the database +# localhost:8000/graphql should not be accessible yet # 2. Before we start the graphix service, we need to set the database URL: export GRAPHIX_DB_URL=postgresql://postgres:password@localhost:5433/graphix @@ -86,7 +87,7 @@ docker-compose -f compose/dev.dependencies.yml up poi-monitor # Start the POI mo ### Running Tests ```bash -pytest tests/ --cov=src --cov-report=term-missing +pytest services/poi_monitor/tests/ --cov=services/poi_monitor/src --cov-report=term-missing ``` ### Project Structure diff --git a/services/poi_monitor/tests/test_analyzer.py b/services/poi_monitor/tests/test_analyzer.py new file mode 100644 index 0000000..10a73dc --- /dev/null +++ b/services/poi_monitor/tests/test_analyzer.py @@ -0,0 +1,174 @@ +import pytest +from datetime import datetime +from unittest.mock import Mock, patch, MagicMock +from src.analyzer import PoiAnalyzer +from src.database import Database +from src.notification import SlackNotifier +import requests + +@pytest.fixture +def mock_db(): + db = Mock(spec=Database) + # Create a context manager mock + context_mock = MagicMock() + mock_conn = MagicMock() + mock_cursor = MagicMock() + + # Set up the context manager chain + db.get_connection.return_value = context_mock + context_mock.__enter__.return_value = mock_conn + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + # Store cursor on db for tests that need it + db._test_cursor = mock_cursor + return db # Return just the db, not the tuple + +@pytest.fixture +def mock_notifier(): + return Mock(spec=SlackNotifier) + +@pytest.fixture +def analyzer(mock_db, mock_notifier): + return PoiAnalyzer(mock_db, mock_notifier) + +def test_analyze_pois_no_discrepancy(analyzer, mock_db): + # Setup + deployment_id = "Qm123" + block_number = 1000 + + # Mock database responses + mock_db.check_notification_sent.return_value = False + mock_db.get_latest_pois.return_value = { + "poi_hash_1": {"indexer1", "indexer2"} # Single POI hash = no discrepancy + } + + # Execute + result = analyzer.analyze_pois(deployment_id, block_number) + + # Assert + assert result is None + mock_db.check_notification_sent.assert_called_once_with(deployment_id, block_number) + mock_db.get_latest_pois.assert_called_once_with(deployment_id, block_number) + +def test_analyze_pois_with_discrepancy(analyzer, mock_db): + # Setup + deployment_id = "Qm123" + block_number = 1000 + + # Mock database responses + mock_db.check_notification_sent.return_value = False + mock_db.get_latest_pois.return_value = { + "poi_hash_1": {"indexer1"}, + "poi_hash_2": {"indexer2"} # Two different POI hashes = discrepancy + } + + # Execute + result = analyzer.analyze_pois(deployment_id, block_number) + + # Assert + assert result is not None + assert result["deployment_cid"] == deployment_id + assert result["block_number"] == block_number + assert result["submissions"] == mock_db.get_latest_pois.return_value + +def test_check_poi_reuse(analyzer): + """Test POI reuse detection.""" + # Setup + submissions = { + "poi_hash_1": {"indexer1"}, + "poi_hash_2": {"indexer2"} + } + + # Mock database responses + mock_cursor = analyzer.db._test_cursor + mock_cursor.fetchall.return_value = [ + # Match the columns from the query: + # poi, deployment_id, block_number, indexer_address, network_name, submission_time + ("poi_hash_1", "deployment1", 1000, b"addr1", "mainnet", datetime.now()), + ("poi_hash_1", "deployment2", 900, b"addr2", "mainnet", datetime.now()) + ] + + # Execute + result = analyzer._check_poi_reuse(submissions) + + # Assert + assert "poi_hash_1" in result + assert len(result["poi_hash_1"]) == 1 + assert "Previously used" in result["poi_hash_1"][0] + +def test_analyze_pois_already_notified(analyzer, mock_db): + """Test that we don't re-notify about known discrepancies.""" + deployment_id = "Qm123" + block_number = 1000 + + mock_db.check_notification_sent.return_value = True + + result = analyzer.analyze_pois(deployment_id, block_number) + assert result is None + mock_db.get_latest_pois.assert_not_called() + +def test_analyze_pois_no_submissions(analyzer, mock_db): + """Test handling of blocks with no POI submissions.""" + deployment_id = "Qm123" + block_number = 1000 + + mock_db.check_notification_sent.return_value = False + mock_db.get_latest_pois.return_value = {} + + result = analyzer.analyze_pois(deployment_id, block_number) + assert result is None + +def test_process_new_submissions_handles_errors(analyzer, mock_db): + """Test error handling in the main processing loop.""" + # Mock _get_recent_submissions to return some test data + analyzer._get_recent_submissions = Mock(return_value=[ + ("Qm123", 1000), + ("Qm456", 2000) + ]) + + # Make analyze_pois raise an exception for the second submission + def mock_analyze(deployment_id, block_number): + if deployment_id == "Qm456": + raise Exception("Test error") + return None + + analyzer.analyze_pois = Mock(side_effect=mock_analyze) + mock_db.cleanup_old_notifications = Mock() + + # This should not raise an exception and should continue processing + analyzer.process_new_submissions() + + # Verify we tried to process both submissions + assert analyzer.analyze_pois.call_count == 2 + # Verify cleanup was still called + mock_db.cleanup_old_notifications.assert_called_once() + +def test_get_recent_submissions_handles_api_errors(analyzer): + """Test handling of GraphQL API errors.""" + with patch('requests.post') as mock_post: + # Mock a failed API response + mock_post.side_effect = requests.exceptions.RequestException("API Error") + + result = analyzer._get_recent_submissions() + assert result == [] # Should return empty list on error + +def test_check_poi_reuse_with_multiple_reuses(analyzer): + """Test POI reuse detection with multiple reuse patterns.""" + submissions = { + "poi_hash_1": {"indexer1"}, + "poi_hash_2": {"indexer2"} + } + + now = datetime.now() + + # Mock database responses + mock_cursor = analyzer.db._test_cursor + mock_cursor.fetchall.return_value = [ + ("poi_hash_1", "deployment1", 1000, b"addr1", "mainnet", now), + ("poi_hash_1", "deployment2", 900, b"addr2", "mainnet", now), + ("poi_hash_2", "deployment1", 1000, b"addr1", "mainnet", now), + ("poi_hash_2", "deployment1", 950, b"addr1", "mainnet", now) + ] + + result = analyzer._check_poi_reuse(submissions) + assert len(result) == 2 # Both POIs were reused \ No newline at end of file diff --git a/services/poi_monitor/tests/test_database.py b/services/poi_monitor/tests/test_database.py new file mode 100644 index 0000000..e3e2ff6 --- /dev/null +++ b/services/poi_monitor/tests/test_database.py @@ -0,0 +1,114 @@ +import pytest +from datetime import datetime +from unittest.mock import Mock, MagicMock, patch +from src.database import Database +import psycopg2 + +@pytest.fixture +def mock_conn(): + conn = MagicMock() + with patch('psycopg2.connect', return_value=conn): + yield conn + +@pytest.fixture +def database(mock_conn): + """Create a Database instance with mocked connection.""" + with patch('psycopg2.pool.SimpleConnectionPool') as mock_pool: + # Mock the connection pool + pool = MagicMock() + pool.getconn.return_value = mock_conn + mock_pool.return_value = pool + + # Mock migrations + with patch('src.migration.MigrationManager') as mock_manager: + mock_manager_instance = Mock() + mock_manager.return_value = mock_manager_instance + + db = Database() + return db + +def test_database_connection_retry(mock_conn): + """Test that database connection retries on failure.""" + with patch('psycopg2.pool.SimpleConnectionPool') as mock_pool: + # Make pool creation fail twice then succeed + mock_pool.side_effect = [ + psycopg2.Error("Test error"), + psycopg2.Error("Test error"), + MagicMock() # Successful pool + ] + + # Mock migrations + with patch('src.migration.MigrationManager') as mock_manager: + mock_manager_instance = Mock() + mock_manager.return_value = mock_manager_instance + with patch('time.sleep'): # Don't actually sleep in tests + db = Database() + # Verify pool was created + assert db.pool is not None + +def test_get_latest_pois(database, mock_conn): + """Test fetching latest POI submissions.""" + mock_cursor = MagicMock() + mock_cursor.fetchall.return_value = [ + ("poi1", "indexer1"), + ("poi1", "indexer2"), + ("poi2", "indexer3") + ] + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + result = database.get_latest_pois("deployment1", 1000) + + assert len(result) == 2 # Two unique POIs + assert result["poi1"] == {"indexer1", "indexer2"} + assert result["poi2"] == {"indexer3"} + +def test_check_notification_sent(database, mock_conn): + """Test checking if notification was already sent.""" + mock_cursor = MagicMock() + mock_cursor.fetchone.return_value = (True,) + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + result = database.check_notification_sent("deployment1", 1000) + assert result is True + +def test_record_notification(database, mock_conn): + """Test recording a notification.""" + mock_cursor = MagicMock() + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + database.record_notification("deployment1", 1000, "test message") + + # Verify the INSERT query was executed with correct parameters + mock_cursor.execute.assert_any_call( + """ + WITH current_pois AS ( + SELECT array_agg(DISTINCT p.poi ORDER BY p.poi) as poi_set + FROM pois p + JOIN blocks b ON b.id = p.block_id + JOIN sg_deployments d ON d.id = p.sg_deployment_id + WHERE d.ipfs_cid = %s AND b.number = %s + ) + INSERT INTO poi_notifications (deployment_id, block_number, message, sent_at, poi_set) + SELECT %s, %s, %s, NOW(), c.poi_set::bytea[] + FROM current_pois c + """, + ("deployment1", 1000, "deployment1", 1000, "test message") + ) + mock_conn.commit.assert_called_once() + +def test_cleanup_old_notifications(database, mock_conn): + """Test cleaning up old notifications.""" + mock_cursor = MagicMock() + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + database.cleanup_old_notifications(days=30) + + # Verify the DELETE query was executed with correct parameters + mock_cursor.execute.assert_any_call( + """ + DELETE FROM poi_notifications + WHERE sent_at < NOW() - INTERVAL '%s days' + """, + (30,) + ) + mock_conn.commit.assert_called_once() \ No newline at end of file diff --git a/services/poi_monitor/tests/test_migration.py b/services/poi_monitor/tests/test_migration.py new file mode 100644 index 0000000..3160c7a --- /dev/null +++ b/services/poi_monitor/tests/test_migration.py @@ -0,0 +1,38 @@ +import pytest +from unittest.mock import Mock, MagicMock, patch +from src.migration import MigrationManager +import os + +@pytest.fixture +def mock_conn(): + return MagicMock() + +@pytest.fixture +def manager(mock_conn): + return MigrationManager(mock_conn) + +def test_ensure_migration_table(manager, mock_conn): + """Test migration table creation.""" + mock_cursor = MagicMock() + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + # Reset the mock to clear any previous calls + mock_conn.commit.reset_mock() + + manager._ensure_migration_table() + + mock_cursor.execute.assert_called_once() + mock_conn.commit.assert_called_once() + +def test_get_applied_migrations(manager, mock_conn): + """Test fetching applied migrations.""" + mock_cursor = MagicMock() + mock_cursor.fetchall.return_value = [ + ("001_create_notifications_table",), + ("002_another_migration",) + ] + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + result = manager.get_applied_migrations() + assert len(result) == 2 + assert "001_create_notifications_table" in result \ No newline at end of file diff --git a/services/poi_monitor/tests/test_monitor.py b/services/poi_monitor/tests/test_monitor.py new file mode 100644 index 0000000..875c9f2 --- /dev/null +++ b/services/poi_monitor/tests/test_monitor.py @@ -0,0 +1,69 @@ +import pytest +from unittest.mock import Mock, patch +from src.monitor import main +from src.database import Database +from src.notification import SlackNotifier +from src.analyzer import PoiAnalyzer + +def test_main_initializes_components(): + """Test that main properly initializes all components.""" + with patch('src.monitor.Database') as mock_db_class, \ + patch('src.monitor.SlackNotifier') as mock_notifier_class, \ + patch('src.monitor.PoiAnalyzer') as mock_analyzer_class, \ + patch('src.monitor.time.sleep', side_effect=KeyboardInterrupt): # Break the loop + + # Setup mocks + mock_db = Mock(spec=Database) + mock_notifier = Mock(spec=SlackNotifier) + mock_analyzer = Mock(spec=PoiAnalyzer) + + mock_db_class.return_value = mock_db + mock_notifier_class.return_value = mock_notifier + mock_analyzer_class.return_value = mock_analyzer + + # Run main (will be interrupted by KeyboardInterrupt) + main() + + # Verify components were initialized + mock_db_class.assert_called_once() + mock_notifier_class.assert_called_once() + mock_analyzer_class.assert_called_once_with(mock_db, mock_notifier) + +def test_main_handles_initialization_error(): + """Test that main properly handles initialization errors.""" + with patch('src.monitor.Database', side_effect=Exception("Test error")), \ + patch('src.monitor.logger') as mock_logger: + + with pytest.raises(Exception): + main() + + mock_logger.error.assert_called_once() + +def test_main_processes_submissions(): + """Test that main calls process_new_submissions.""" + with patch('src.monitor.Database') as mock_db_class, \ + patch('src.monitor.SlackNotifier') as mock_notifier_class, \ + patch('src.monitor.PoiAnalyzer') as mock_analyzer_class, \ + patch('src.monitor.time.sleep', side_effect=KeyboardInterrupt): # Stop after first run + + # Setup mocks + mock_analyzer = Mock(spec=PoiAnalyzer) + mock_analyzer_class.return_value = mock_analyzer + + # Run main + main() + + # Verify process_new_submissions was called + assert mock_analyzer.process_new_submissions.call_count == 1 + +def test_main_handles_keyboard_interrupt(): + """Test that main handles keyboard interrupt gracefully.""" + with patch('src.monitor.Database') as mock_db_class, \ + patch('src.monitor.SlackNotifier') as mock_notifier_class, \ + patch('src.monitor.PoiAnalyzer') as mock_analyzer_class, \ + patch('src.monitor.time.sleep', side_effect=KeyboardInterrupt), \ + patch('src.monitor.logger') as mock_logger: + + main() + + mock_logger.info.assert_any_call("Shutting down POI Monitor service...") \ No newline at end of file diff --git a/services/poi_monitor/tests/test_notification.py b/services/poi_monitor/tests/test_notification.py new file mode 100644 index 0000000..bfe7791 --- /dev/null +++ b/services/poi_monitor/tests/test_notification.py @@ -0,0 +1,49 @@ +import pytest +from unittest.mock import Mock, patch +from src.notification import SlackNotifier +import requests + +@pytest.fixture +def notifier(): + with patch.dict('os.environ', {'SLACK_WEBHOOK_URL': 'http://test.url'}): + return SlackNotifier() + +def test_send_notification_success(notifier): + """Test successful notification sending.""" + with patch('requests.post') as mock_post: + mock_post.return_value.raise_for_status.return_value = None + + result = notifier.send_notification("test message") + + assert result is True + mock_post.assert_called_once() + +def test_send_notification_failure(notifier): + """Test handling of notification failure.""" + with patch('requests.post') as mock_post: + mock_post.side_effect = requests.exceptions.RequestException("Test error") + + result = notifier.send_notification("test message") + + assert result is False + +def test_format_poi_discrepancy_message(notifier): + """Test message formatting.""" + data = { + 'deployment_cid': 'Qm123', + 'block_number': 1000, + 'submissions': { + 'poi1': {'indexer1', 'indexer2'}, + 'poi2': {'indexer3'} + }, + 'reuse_info': { + 'poi2': ['Previously used in deployment X'] + } + } + + message = notifier.format_poi_discrepancy_message(data) + + assert '🚨' in message + assert 'Qm123' in message + assert 'indexer1' in message + assert 'Previously used in deployment X' in message \ No newline at end of file From f1b1d315377f8d9a6595a91b525bb8de7a8b863c Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Fri, 10 Jan 2025 16:32:14 +0000 Subject: [PATCH 3/5] ruff check/format --- .../001_create_notifications_table.py | 12 +- services/poi_monitor/requirements-dev.txt | 3 +- services/poi_monitor/setup.py | 2 +- services/poi_monitor/src/__init__.py | 2 +- services/poi_monitor/src/analyzer.py | 227 ++++++++++-------- services/poi_monitor/src/database.py | 63 ++--- services/poi_monitor/src/migration.py | 32 +-- services/poi_monitor/src/monitor.py | 34 ++- services/poi_monitor/src/notification.py | 40 +-- services/poi_monitor/tests/test_analyzer.py | 78 +++--- services/poi_monitor/tests/test_database.py | 50 ++-- services/poi_monitor/tests/test_migration.py | 19 +- services/poi_monitor/tests/test_monitor.py | 63 ++--- .../poi_monitor/tests/test_notification.py | 45 ++-- 14 files changed, 360 insertions(+), 310 deletions(-) diff --git a/services/poi_monitor/migrations/001_create_notifications_table.py b/services/poi_monitor/migrations/001_create_notifications_table.py index edbf886..ca63f14 100644 --- a/services/poi_monitor/migrations/001_create_notifications_table.py +++ b/services/poi_monitor/migrations/001_create_notifications_table.py @@ -4,9 +4,10 @@ logger = logging.getLogger(__name__) + def migrate_up(conn: Optional[psycopg2.extensions.connection] = None) -> None: """Create the poi_notifications table.""" - + up_sql = """ CREATE TABLE IF NOT EXISTS poi_notifications ( id SERIAL PRIMARY KEY, @@ -20,7 +21,7 @@ def migrate_up(conn: Optional[psycopg2.extensions.connection] = None) -> None: CREATE INDEX IF NOT EXISTS idx_poi_notifications_sent_at ON poi_notifications(sent_at); """ - + try: with conn.cursor() as cur: cur.execute(up_sql) @@ -31,13 +32,14 @@ def migrate_up(conn: Optional[psycopg2.extensions.connection] = None) -> None: logger.error(f"Failed to create poi_notifications table: {str(e)}") raise + def migrate_down(conn: Optional[psycopg2.extensions.connection] = None) -> None: """Remove the poi_notifications table.""" - + down_sql = """ DROP TABLE IF EXISTS poi_notifications; """ - + try: with conn.cursor() as cur: cur.execute(down_sql) @@ -46,4 +48,4 @@ def migrate_down(conn: Optional[psycopg2.extensions.connection] = None) -> None: except Exception as e: conn.rollback() logger.error(f"Failed to drop poi_notifications table: {str(e)}") - raise \ No newline at end of file + raise diff --git a/services/poi_monitor/requirements-dev.txt b/services/poi_monitor/requirements-dev.txt index 6070917..ca770e4 100644 --- a/services/poi_monitor/requirements-dev.txt +++ b/services/poi_monitor/requirements-dev.txt @@ -1,3 +1,4 @@ pytest==8.0.0 pytest-cov==4.1.0 -pytest-mock==3.12.0 \ No newline at end of file +pytest-mock==3.12.0 +ruff==0.3.0 \ No newline at end of file diff --git a/services/poi_monitor/setup.py b/services/poi_monitor/setup.py index b69e48f..b0584c8 100644 --- a/services/poi_monitor/setup.py +++ b/services/poi_monitor/setup.py @@ -12,4 +12,4 @@ "pyyaml", "python-json-logger", ], -) \ No newline at end of file +) diff --git a/services/poi_monitor/src/__init__.py b/services/poi_monitor/src/__init__.py index c7d9a6d..7ff37fb 100644 --- a/services/poi_monitor/src/__init__.py +++ b/services/poi_monitor/src/__init__.py @@ -18,4 +18,4 @@ from .monitor import main __version__ = "0.1.0" -__all__ = ['PoiAnalyzer', 'Database', 'SlackNotifier', 'main'] +__all__ = ["PoiAnalyzer", "Database", "SlackNotifier", "main"] diff --git a/services/poi_monitor/src/analyzer.py b/services/poi_monitor/src/analyzer.py index 38819eb..9a96ac2 100644 --- a/services/poi_monitor/src/analyzer.py +++ b/services/poi_monitor/src/analyzer.py @@ -1,7 +1,6 @@ # Import external libraries import logging -from typing import Dict, Set, List, Optional, Tuple -from datetime import datetime, timedelta +from typing import Dict, Set, List, Optional import os import requests @@ -12,11 +11,12 @@ # Configure logging logger = logging.getLogger(__name__) + class PoiAnalyzer: """Class to analyze POI submissions and notify slack if discrepancies are found.""" - + def __init__(self, database: Database, notifier: SlackNotifier): - """Initialize the POI analyzer with a database and notifier. + """Initialize the POI analyzer with a database and notifier. These are internal modules that are injected into the class.""" self.db = database self.notifier = notifier @@ -24,7 +24,7 @@ def __init__(self, database: Database, notifier: SlackNotifier): def analyze_pois(self, deployment_id: str, block_number: int) -> Optional[Dict]: """Analyze POI submissions and detect discrepancies between indexers. - + This method checks if different indexers have submitted different POI (Proof of Indexing) hashes for the same deployment and block. A discrepancy indicates that indexers disagree about the correct POI value. @@ -38,7 +38,7 @@ def analyze_pois(self, deployment_id: str, block_number: int) -> Optional[Dict]: - Notification was already sent - No POI submissions exist - Only one unique POI hash exists (no discrepancy) - + Returns a dictionary containing discrepancy details if multiple different POI hashes are found, with structure: { @@ -54,46 +54,54 @@ def analyze_pois(self, deployment_id: str, block_number: int) -> Optional[Dict]: try: # Skip if we've already notified about this deployment/block if self.db.check_notification_sent(deployment_id, block_number): - logger.debug(f"Already notified about {deployment_id} at block {block_number}") + logger.debug( + f"Already notified about {deployment_id} at block {block_number}" + ) return None # Get all POI submissions for this deployment/block poi_submissions = self.db.get_latest_pois(deployment_id, block_number) - + if not poi_submissions: - logger.debug(f"No POI submissions found for {deployment_id} at block {block_number}") + logger.debug( + f"No POI submissions found for {deployment_id} at block {block_number}" + ) return None - + # If there's only one POI hash, there's no discrepancy if len(poi_submissions) == 1: - logger.debug(f"All indexers agree on POI for {deployment_id} at block {block_number}") + logger.debug( + f"All indexers agree on POI for {deployment_id} at block {block_number}" + ) return None - + # We have a discrepancy - format the data discrepancy_data = { - 'deployment_cid': deployment_id, - 'block_number': block_number, - 'submissions': poi_submissions, - 'reuse_info': self._check_poi_reuse(poi_submissions) + "deployment_cid": deployment_id, + "block_number": block_number, + "submissions": poi_submissions, + "reuse_info": self._check_poi_reuse(poi_submissions), } - + return discrepancy_data - + except Exception as e: logger.error(f"Error analyzing POIs: {str(e)}", exc_info=True) raise - def _check_poi_reuse(self, submissions: Dict[str, Set[str]]) -> Dict[str, List[str]]: + def _check_poi_reuse( + self, submissions: Dict[str, Set[str]] + ) -> Dict[str, List[str]]: """Check if any POIs have been reused from other blocks/deployments. - + Params: submissions: Dictionary mapping POI hashes to sets of indexer addresses - + Returns: Dict mapping POI hashes to lists of reuse information """ reuse_info = {} - + query = """ SELECT p.poi, @@ -110,55 +118,63 @@ def _check_poi_reuse(self, submissions: Dict[str, Set[str]]) -> Dict[str, List[s WHERE p.poi = ANY(%s) ORDER BY p.created_at DESC """ - + try: - with self.db.get_connection() as conn: - with conn.cursor() as cur: - poi_hashes = list(submissions.keys()) - cur.execute(query, (poi_hashes,)) - results = cur.fetchall() - - # Group results by POI hash - poi_occurrences = {} - for row in results: - (poi_hash, deployment_id, block_number, - indexer_addr, network, timestamp) = row - if poi_hash not in poi_occurrences: - poi_occurrences[poi_hash] = [] - poi_occurrences[poi_hash].append({ - 'deployment_id': deployment_id, - 'block_number': block_number, - 'indexer_address': indexer_addr, - 'network': network, - 'timestamp': timestamp - }) - - # Format detailed reuse information - for poi_hash, occurrences in poi_occurrences.items(): - if len(occurrences) > 1: # POI appears more than once - reuse_info[poi_hash] = [] - - # Sort by timestamp descending - occurrences.sort(key=lambda x: x['timestamp'], reverse=True) - - # First occurrence is current - current = occurrences[0] - - # Add details for each previous use - for prev in occurrences[1:]: - time_diff = current['timestamp'] - prev['timestamp'] - days_ago = time_diff.days - - reuse_info[poi_hash].append( - f"Previously used {days_ago} days ago:\n" - f"• Network: {prev['network']}\n" - f"• Deployment: {prev['deployment_id']}\n" - f"• Block: {prev['block_number']}\n" - f"• Indexer: {prev['indexer_address']}" - ) - - return reuse_info - + with self.db.get_connection() as conn: + with conn.cursor() as cur: + poi_hashes = list(submissions.keys()) + cur.execute(query, (poi_hashes,)) + results = cur.fetchall() + + # Group results by POI hash + poi_occurrences = {} + for row in results: + ( + poi_hash, + deployment_id, + block_number, + indexer_addr, + network, + timestamp, + ) = row + if poi_hash not in poi_occurrences: + poi_occurrences[poi_hash] = [] + poi_occurrences[poi_hash].append( + { + "deployment_id": deployment_id, + "block_number": block_number, + "indexer_address": indexer_addr, + "network": network, + "timestamp": timestamp, + } + ) + + # Format detailed reuse information + for poi_hash, occurrences in poi_occurrences.items(): + if len(occurrences) > 1: # POI appears more than once + reuse_info[poi_hash] = [] + + # Sort by timestamp descending + occurrences.sort(key=lambda x: x["timestamp"], reverse=True) + + # First occurrence is current + current = occurrences[0] + + # Add details for each previous use + for prev in occurrences[1:]: + time_diff = current["timestamp"] - prev["timestamp"] + days_ago = time_diff.days + + reuse_info[poi_hash].append( + f"Previously used {days_ago} days ago:\n" + f"• Network: {prev['network']}\n" + f"• Deployment: {prev['deployment_id']}\n" + f"• Block: {prev['block_number']}\n" + f"• Indexer: {prev['indexer_address']}" + ) + + return reuse_info + except Exception as e: logger.error(f"Error checking POI reuse: {str(e)}", exc_info=True) return {} @@ -167,38 +183,41 @@ def process_new_submissions(self) -> None: """Process any new POI submissions and send notifications for discrepancies.""" try: recent_submissions = self._get_recent_submissions() - + for deployment_id, block_number in recent_submissions: try: discrepancy = self.analyze_pois(deployment_id, block_number) - + if discrepancy: # Format and send notification - message = self.notifier.format_poi_discrepancy_message(discrepancy) + message = self.notifier.format_poi_discrepancy_message( + discrepancy + ) if self.notifier.send_notification(message): # Record that we sent the notification self.db.record_notification( - deployment_id, - block_number, - message + deployment_id, block_number, message ) except Exception as e: # Log the error but continue processing other submissions - logger.error(f"Error processing submission for deployment {deployment_id} at block {block_number}: {str(e)}", exc_info=True) + logger.error( + f"Error processing submission for deployment {deployment_id} at block {block_number}: {str(e)}", + exc_info=True, + ) continue - + # Cleanup old POI notifications from the database self.db.cleanup_old_notifications(days=60) - + except Exception as e: logger.error(f"Error processing submissions: {str(e)}", exc_info=True) raise def _get_recent_submissions(self) -> List[tuple[str, int]]: """Get list of recent deployment/block combinations to check.""" - graphql_url = os.getenv('GRAPHIX_API_URL', 'http://localhost:8000/graphql') + graphql_url = os.getenv("GRAPHIX_API_URL", "http://localhost:8000/graphql") submissions = set() - + query = """ query { poiAgreementRatios( @@ -219,43 +238,43 @@ def _get_recent_submissions(self) -> List[tuple[str, int]]: } } """ - + try: indexers = self._get_indexers() if not indexers: logger.error("No indexers found") return [] - + for indexer_address in indexers: logger.debug(f"Fetching POIs for indexer {indexer_address}") current_query = query % indexer_address - + response = requests.post( - graphql_url, - json={'query': current_query}, - timeout=10 + graphql_url, json={"query": current_query}, timeout=10 ) response.raise_for_status() data = response.json() - - if 'errors' in data: + + if "errors" in data: logger.error(f"GraphQL errors: {data['errors']}") break - - if 'data' not in data or 'poiAgreementRatios' not in data['data']: + + if "data" not in data or "poiAgreementRatios" not in data["data"]: logger.error("Unexpected GraphQL response format") break - + # Extract POIs from current page - agreements = data['data']['poiAgreementRatios'] + agreements = data["data"]["poiAgreementRatios"] for agreement in agreements: - submissions.add(( - agreement['poi']['deployment']['cid'], - agreement['poi']['block']['number'] - )) - + submissions.add( + ( + agreement["poi"]["deployment"]["cid"], + agreement["poi"]["block"]["number"], + ) + ) + return list(submissions) - + except requests.exceptions.RequestException as e: logger.error(f"Failed to fetch recent submissions: {str(e)}") return [] @@ -276,14 +295,12 @@ def _get_indexers(self) -> List[str]: current_query = query % self.page_size logger.debug(f"Fetching indexers with limit {self.page_size}") response = requests.post( - os.getenv('GRAPHIX_API_URL'), - json={'query': current_query}, - timeout=10 + os.getenv("GRAPHIX_API_URL"), json={"query": current_query}, timeout=10 ) data = response.json() - if 'data' in data and 'indexers' in data['data']: - indexers = data['data']['indexers'] - all_indexers.extend([indexer['address'] for indexer in indexers]) + if "data" in data and "indexers" in data["data"]: + indexers = data["data"]["indexers"] + all_indexers.extend([indexer["address"] for indexer in indexers]) return all_indexers return [] except Exception as e: diff --git a/services/poi_monitor/src/database.py b/services/poi_monitor/src/database.py index 14fe130..df4a4b0 100644 --- a/services/poi_monitor/src/database.py +++ b/services/poi_monitor/src/database.py @@ -1,7 +1,6 @@ import os import logging from typing import Dict, Set -import psycopg2 from psycopg2.pool import SimpleConnectionPool from contextlib import contextmanager from dotenv import load_dotenv @@ -9,6 +8,7 @@ logger = logging.getLogger(__name__) load_dotenv() + class Database: """ This class is the Database manager for POI monitoring. @@ -22,7 +22,7 @@ class Database: def __init__(self): """ Constructor initializes database connection and runs migrations. - + Raises: psycopg2.Error: If database connection cannot be established Exception: If migrations fail to apply @@ -31,13 +31,13 @@ def __init__(self): self.pool = SimpleConnectionPool( minconn=1, maxconn=10, - dbname=os.getenv('POSTGRES_DB', 'graphix'), - user=os.getenv('POSTGRES_USER', 'postgres'), - password=os.getenv('POSTGRES_PASSWORD', 'password'), - host=os.getenv('POSTGRES_HOST', 'localhost'), - port=os.getenv('POSTGRES_PORT', '5433') + dbname=os.getenv("POSTGRES_DB", "graphix"), + user=os.getenv("POSTGRES_USER", "postgres"), + password=os.getenv("POSTGRES_PASSWORD", "password"), + host=os.getenv("POSTGRES_HOST", "localhost"), + port=os.getenv("POSTGRES_PORT", "5433"), ) - + # Get initial connection for migrations with self.get_connection() as conn: self.conn = conn # Store temporary reference for migrations @@ -49,7 +49,7 @@ def get_connection(self): """ Get a database connection from the pool with automatic cleanup (this is faster than creating a new connetion each time we want to talk to the db) - + - Automatically returns connection to pool after use - Handles cleanup even if an exception occurs """ @@ -59,9 +59,11 @@ def get_connection(self): finally: self.pool.putconn(conn) - def get_latest_pois(self, deployment_id: str, block_number: int) -> Dict[str, Set[str]]: + def get_latest_pois( + self, deployment_id: str, block_number: int + ) -> Dict[str, Set[str]]: """Fetch all indexer POI submissions for a specific deployment and block. - + Retrieves all POI submissions and the indexers that submitted them for a given deployment at a specific block number. @@ -85,27 +87,27 @@ def get_latest_pois(self, deployment_id: str, block_number: int) -> Dict[str, Se JOIN sg_deployments d ON d.id = p.sg_deployment_id WHERE d.ipfs_cid = %s AND b.number = %s """ - + with self.get_connection() as conn: with conn.cursor() as cur: cur.execute(query, (deployment_id, block_number)) results = cur.fetchall() - + poi_submissions = {} for poi_hash, indexer_addr in results: if poi_hash not in poi_submissions: poi_submissions[poi_hash] = set() poi_submissions[poi_hash].add(indexer_addr) - + return poi_submissions def check_notification_sent(self, deployment_id: str, block_number: int) -> bool: """Check if we've already notified about the current set of POIs for this deployment/block. - + Params: deployment_id: The deployment CID block_number: The block number - + Returns: bool: True if we've already notified about these exact POIs """ @@ -126,21 +128,25 @@ def check_notification_sent(self, deployment_id: str, block_number: int) -> bool AND n.poi_set = c.poi_set ) """ - + with self.get_connection() as conn: with conn.cursor() as cur: - cur.execute(query, (deployment_id, block_number, deployment_id, block_number)) + cur.execute( + query, (deployment_id, block_number, deployment_id, block_number) + ) return cur.fetchone()[0] - def record_notification(self, deployment_id: str, block_number: int, message: str) -> None: + def record_notification( + self, deployment_id: str, block_number: int, message: str + ) -> None: """Record that a notification was sent. Later used to prevent duplicate notifications/spam. - + Params: deployment_id: The deployment IPFS hash block_number: The block number message: The notification message that was sent """ - + query = """ WITH current_pois AS ( SELECT array_agg(DISTINCT p.poi ORDER BY p.poi) as poi_set @@ -153,10 +159,13 @@ def record_notification(self, deployment_id: str, block_number: int, message: st SELECT %s, %s, %s, NOW(), c.poi_set::bytea[] FROM current_pois c """ - + with self.get_connection() as conn: with conn.cursor() as cur: - cur.execute(query, (deployment_id, block_number, deployment_id, block_number, message)) + cur.execute( + query, + (deployment_id, block_number, deployment_id, block_number, message), + ) conn.commit() def cleanup_old_notifications(self, days: int = 60) -> None: @@ -165,7 +174,7 @@ def cleanup_old_notifications(self, days: int = 60) -> None: DELETE FROM poi_notifications WHERE sent_at < NOW() - INTERVAL '%s days' """ - + with self.get_connection() as conn: with conn.cursor() as cur: cur.execute(query, (days,)) @@ -174,11 +183,11 @@ def cleanup_old_notifications(self, days: int = 60) -> None: def _run_migrations(self): """Run any pending database migrations.""" from .migration import MigrationManager - + try: manager = MigrationManager(self.conn) manager.apply_migrations() - + # Verify table structure with self.conn.cursor() as cur: cur.execute(""" @@ -188,7 +197,7 @@ def _run_migrations(self): """) columns = cur.fetchall() logger.info(f"poi_notifications table structure: {columns}") - + except Exception as e: logger.error(f"Failed to run migrations: {str(e)}") raise diff --git a/services/poi_monitor/src/migration.py b/services/poi_monitor/src/migration.py index b588fad..926e954 100644 --- a/services/poi_monitor/src/migration.py +++ b/services/poi_monitor/src/migration.py @@ -6,6 +6,7 @@ logger = logging.getLogger(__name__) + class MigrationManager: def __init__(self, conn: psycopg2.extensions.connection): self.conn = conn @@ -34,27 +35,30 @@ def get_applied_migrations(self) -> List[str]: def apply_migrations(self): """Apply all pending migrations.""" applied = set(self.get_applied_migrations()) - + # Get all migration files - migrations_dir = os.path.join(os.path.dirname(__file__), '..', 'migrations') + migrations_dir = os.path.join(os.path.dirname(__file__), "..", "migrations") logger.info(f"Looking for migrations in: {migrations_dir}") - migration_files = sorted([ - f for f in os.listdir(migrations_dir) - if (f.endswith('.py') and f != '__init__.py') or f.endswith('.sql') - ]) + migration_files = sorted( + [ + f + for f in os.listdir(migrations_dir) + if (f.endswith(".py") and f != "__init__.py") or f.endswith(".sql") + ] + ) logger.info(f"Found migration files: {migration_files}") for migration_file in migration_files: migration_name = migration_file[:-3] - + if migration_name in applied: logger.info(f"Skipping already applied migration: {migration_name}") continue logger.info(f"Applying migration: {migration_name}") - + try: - if migration_file.endswith('.sql'): + if migration_file.endswith(".sql"): # Handle SQL files with open(os.path.join(migrations_dir, migration_file)) as f: sql = f.read() @@ -65,18 +69,18 @@ def apply_migrations(self): # Import and run Python migrations module = importlib.import_module(f"migrations.{migration_name}") module.migrate_up(self.conn) - + # Record the migration with self.conn.cursor() as cur: cur.execute( "INSERT INTO poi_monitor_migrations (migration_name) VALUES (%s)", - (migration_name,) + (migration_name,), ) self.conn.commit() - + logger.info(f"Successfully applied migration: {migration_name}") - + except Exception as e: self.conn.rollback() logger.error(f"Failed to apply migration {migration_name}: {str(e)}") - raise \ No newline at end of file + raise diff --git a/services/poi_monitor/src/monitor.py b/services/poi_monitor/src/monitor.py index bc11c4f..9b6d3cd 100644 --- a/services/poi_monitor/src/monitor.py +++ b/services/poi_monitor/src/monitor.py @@ -12,32 +12,25 @@ # Configure logging logging_config = { - 'version': 1, - 'disable_existing_loggers': False, - 'formatters': { - 'json': { - '()': 'pythonjsonlogger.jsonlogger.JsonFormatter', - 'format': '%(asctime)s %(levelname)s %(name)s %(message)s' + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "json": { + "()": "pythonjsonlogger.jsonlogger.JsonFormatter", + "format": "%(asctime)s %(levelname)s %(name)s %(message)s", } }, - 'handlers': { - 'console': { - 'class': 'logging.StreamHandler', - 'formatter': 'json' - } - }, - 'root': { - 'handlers': ['console'], - 'level': 'INFO' - } + "handlers": {"console": {"class": "logging.StreamHandler", "formatter": "json"}}, + "root": {"handlers": ["console"], "level": "INFO"}, } dictConfig(logging_config) logger = logging.getLogger(__name__) + def main(): logger.info("Starting POI Monitor service...") - + # Initialize components try: db = Database() @@ -47,18 +40,19 @@ def main(): except Exception as e: logger.error(f"Failed to initialize components: {str(e)}") raise - + try: while True: logger.info("Running POI check iteration") analyzer.process_new_submissions() - time.sleep(int(os.getenv('CHECK_INTERVAL', 300))) # Default 5 minutes - + time.sleep(int(os.getenv("CHECK_INTERVAL", 300))) # Default 5 minutes + except KeyboardInterrupt: logger.info("Shutting down POI Monitor service...") except Exception as e: logger.error(f"Unexpected error: {str(e)}", exc_info=True) raise + if __name__ == "__main__": main() diff --git a/services/poi_monitor/src/notification.py b/services/poi_monitor/src/notification.py index eb8f0a5..dfeb833 100644 --- a/services/poi_monitor/src/notification.py +++ b/services/poi_monitor/src/notification.py @@ -5,51 +5,51 @@ logger = logging.getLogger(__name__) + class SlackNotifier: """This class is used to send notifications to a Slack channel.""" + def __init__(self, webhook_url: str = None): """ Initializes the SlackNotifier with a webhook URL. - + Params: webhook_url: The Slack webhook URL Raises: ValueError: If the Slack webhook URL is not provided """ - self.webhook_url = webhook_url or os.getenv('SLACK_WEBHOOK_URL') + self.webhook_url = webhook_url or os.getenv("SLACK_WEBHOOK_URL") if not self.webhook_url: raise ValueError("Slack webhook URL not provided") def send_notification(self, message: str) -> bool: """Send a notification to Slack. - + Params: message: The formatted message to send - + Returns: bool: True if the message was sent successfully """ try: response = requests.post( - self.webhook_url, - json={"text": message}, - timeout=10 + self.webhook_url, json={"text": message}, timeout=10 ) response.raise_for_status() logger.info("Successfully sent Slack notification") return True - + except requests.exceptions.RequestException as e: logger.error(f"Failed to send Slack notification: {str(e)}") return False def format_poi_discrepancy_message(self, data: Dict[str, Any]) -> str: """Format POI discrepancy data into a Slack message. - + Params: data: Dictionary containing POI discrepancy information - + Returns: str: Formatted message ready to send to Slack """ @@ -57,23 +57,23 @@ def format_poi_discrepancy_message(self, data: Dict[str, Any]) -> str: "🚨 *New POI Discrepancy Found*", f"*Deployment:* `{data['deployment_cid']}`", f"*Block:* `{data['block_number']}`", - "*POI Submissions:*" + "*POI Submissions:*", ] - for poi_hash, indexers in data['submissions'].items(): + for poi_hash, indexers in data["submissions"].items(): # Convert memoryview to hex string if needed if isinstance(poi_hash, memoryview): poi_hash = poi_hash.hex() - + submission_parts = [f"*POI Hash:* `{poi_hash}`"] - + # Add POI reuse information if available - if 'reuse_info' in data and poi_hash in data['reuse_info']: - reuse_data = data['reuse_info'][poi_hash] + if "reuse_info" in data and poi_hash in data["reuse_info"]: + reuse_data = data["reuse_info"][poi_hash] submission_parts.append("⚠️ *POI Reuse:*") for detail in reuse_data: submission_parts.append(f" • {detail}") - + # Convert indexer addresses if they're memoryview formatted_indexers = [] for indexer in indexers: @@ -81,8 +81,10 @@ def format_poi_discrepancy_message(self, data: Dict[str, Any]) -> str: formatted_indexers.append(indexer.hex()) else: formatted_indexers.append(str(indexer)) - - submission_parts.append(f"*Submitted by:* `{', '.join(sorted(formatted_indexers))}`") + + submission_parts.append( + f"*Submitted by:* `{', '.join(sorted(formatted_indexers))}`" + ) message_parts.extend(submission_parts) message_parts.append("") # Add spacing between submissions diff --git a/services/poi_monitor/tests/test_analyzer.py b/services/poi_monitor/tests/test_analyzer.py index 10a73dc..492565c 100644 --- a/services/poi_monitor/tests/test_analyzer.py +++ b/services/poi_monitor/tests/test_analyzer.py @@ -6,6 +6,7 @@ from src.notification import SlackNotifier import requests + @pytest.fixture def mock_db(): db = Mock(spec=Database) @@ -13,71 +14,73 @@ def mock_db(): context_mock = MagicMock() mock_conn = MagicMock() mock_cursor = MagicMock() - + # Set up the context manager chain db.get_connection.return_value = context_mock context_mock.__enter__.return_value = mock_conn mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + # Store cursor on db for tests that need it db._test_cursor = mock_cursor return db # Return just the db, not the tuple + @pytest.fixture def mock_notifier(): return Mock(spec=SlackNotifier) + @pytest.fixture def analyzer(mock_db, mock_notifier): return PoiAnalyzer(mock_db, mock_notifier) + def test_analyze_pois_no_discrepancy(analyzer, mock_db): # Setup deployment_id = "Qm123" block_number = 1000 - + # Mock database responses mock_db.check_notification_sent.return_value = False mock_db.get_latest_pois.return_value = { "poi_hash_1": {"indexer1", "indexer2"} # Single POI hash = no discrepancy } - + # Execute result = analyzer.analyze_pois(deployment_id, block_number) - + # Assert assert result is None mock_db.check_notification_sent.assert_called_once_with(deployment_id, block_number) mock_db.get_latest_pois.assert_called_once_with(deployment_id, block_number) + def test_analyze_pois_with_discrepancy(analyzer, mock_db): # Setup deployment_id = "Qm123" block_number = 1000 - + # Mock database responses mock_db.check_notification_sent.return_value = False mock_db.get_latest_pois.return_value = { "poi_hash_1": {"indexer1"}, - "poi_hash_2": {"indexer2"} # Two different POI hashes = discrepancy + "poi_hash_2": {"indexer2"}, # Two different POI hashes = discrepancy } - + # Execute result = analyzer.analyze_pois(deployment_id, block_number) - + # Assert assert result is not None assert result["deployment_cid"] == deployment_id assert result["block_number"] == block_number assert result["submissions"] == mock_db.get_latest_pois.return_value + def test_check_poi_reuse(analyzer): """Test POI reuse detection.""" # Setup - submissions = { - "poi_hash_1": {"indexer1"}, - "poi_hash_2": {"indexer2"} - } + submissions = {"poi_hash_1": {"indexer1"}, "poi_hash_2": {"indexer2"}} # Mock database responses mock_cursor = analyzer.db._test_cursor @@ -85,7 +88,7 @@ def test_check_poi_reuse(analyzer): # Match the columns from the query: # poi, deployment_id, block_number, indexer_address, network_name, submission_time ("poi_hash_1", "deployment1", 1000, b"addr1", "mainnet", datetime.now()), - ("poi_hash_1", "deployment2", 900, b"addr2", "mainnet", datetime.now()) + ("poi_hash_1", "deployment2", 900, b"addr2", "mainnet", datetime.now()), ] # Execute @@ -96,79 +99,80 @@ def test_check_poi_reuse(analyzer): assert len(result["poi_hash_1"]) == 1 assert "Previously used" in result["poi_hash_1"][0] + def test_analyze_pois_already_notified(analyzer, mock_db): """Test that we don't re-notify about known discrepancies.""" deployment_id = "Qm123" block_number = 1000 - + mock_db.check_notification_sent.return_value = True - + result = analyzer.analyze_pois(deployment_id, block_number) assert result is None mock_db.get_latest_pois.assert_not_called() + def test_analyze_pois_no_submissions(analyzer, mock_db): """Test handling of blocks with no POI submissions.""" deployment_id = "Qm123" block_number = 1000 - + mock_db.check_notification_sent.return_value = False mock_db.get_latest_pois.return_value = {} - + result = analyzer.analyze_pois(deployment_id, block_number) assert result is None + def test_process_new_submissions_handles_errors(analyzer, mock_db): """Test error handling in the main processing loop.""" # Mock _get_recent_submissions to return some test data - analyzer._get_recent_submissions = Mock(return_value=[ - ("Qm123", 1000), - ("Qm456", 2000) - ]) - + analyzer._get_recent_submissions = Mock( + return_value=[("Qm123", 1000), ("Qm456", 2000)] + ) + # Make analyze_pois raise an exception for the second submission def mock_analyze(deployment_id, block_number): if deployment_id == "Qm456": raise Exception("Test error") return None - + analyzer.analyze_pois = Mock(side_effect=mock_analyze) mock_db.cleanup_old_notifications = Mock() - + # This should not raise an exception and should continue processing analyzer.process_new_submissions() - + # Verify we tried to process both submissions assert analyzer.analyze_pois.call_count == 2 # Verify cleanup was still called mock_db.cleanup_old_notifications.assert_called_once() + def test_get_recent_submissions_handles_api_errors(analyzer): """Test handling of GraphQL API errors.""" - with patch('requests.post') as mock_post: + with patch("requests.post") as mock_post: # Mock a failed API response mock_post.side_effect = requests.exceptions.RequestException("API Error") - + result = analyzer._get_recent_submissions() assert result == [] # Should return empty list on error + def test_check_poi_reuse_with_multiple_reuses(analyzer): """Test POI reuse detection with multiple reuse patterns.""" - submissions = { - "poi_hash_1": {"indexer1"}, - "poi_hash_2": {"indexer2"} - } - + submissions = {"poi_hash_1": {"indexer1"}, "poi_hash_2": {"indexer2"}} + now = datetime.now() - + # Mock database responses mock_cursor = analyzer.db._test_cursor mock_cursor.fetchall.return_value = [ ("poi_hash_1", "deployment1", 1000, b"addr1", "mainnet", now), ("poi_hash_1", "deployment2", 900, b"addr2", "mainnet", now), ("poi_hash_2", "deployment1", 1000, b"addr1", "mainnet", now), - ("poi_hash_2", "deployment1", 950, b"addr1", "mainnet", now) + ("poi_hash_2", "deployment1", 950, b"addr1", "mainnet", now), ] - + result = analyzer._check_poi_reuse(submissions) - assert len(result) == 2 # Both POIs were reused \ No newline at end of file + assert len(result) == 2 # Both POIs were reused diff --git a/services/poi_monitor/tests/test_database.py b/services/poi_monitor/tests/test_database.py index e3e2ff6..14cabd6 100644 --- a/services/poi_monitor/tests/test_database.py +++ b/services/poi_monitor/tests/test_database.py @@ -1,83 +1,88 @@ import pytest -from datetime import datetime from unittest.mock import Mock, MagicMock, patch from src.database import Database import psycopg2 + @pytest.fixture def mock_conn(): conn = MagicMock() - with patch('psycopg2.connect', return_value=conn): + with patch("psycopg2.connect", return_value=conn): yield conn + @pytest.fixture def database(mock_conn): """Create a Database instance with mocked connection.""" - with patch('psycopg2.pool.SimpleConnectionPool') as mock_pool: + with patch("psycopg2.pool.SimpleConnectionPool") as mock_pool: # Mock the connection pool pool = MagicMock() pool.getconn.return_value = mock_conn mock_pool.return_value = pool - + # Mock migrations - with patch('src.migration.MigrationManager') as mock_manager: + with patch("src.migration.MigrationManager") as mock_manager: mock_manager_instance = Mock() mock_manager.return_value = mock_manager_instance - + db = Database() return db + def test_database_connection_retry(mock_conn): """Test that database connection retries on failure.""" - with patch('psycopg2.pool.SimpleConnectionPool') as mock_pool: + with patch("psycopg2.pool.SimpleConnectionPool") as mock_pool: # Make pool creation fail twice then succeed mock_pool.side_effect = [ psycopg2.Error("Test error"), psycopg2.Error("Test error"), - MagicMock() # Successful pool + MagicMock(), # Successful pool ] - + # Mock migrations - with patch('src.migration.MigrationManager') as mock_manager: + with patch("src.migration.MigrationManager") as mock_manager: mock_manager_instance = Mock() mock_manager.return_value = mock_manager_instance - with patch('time.sleep'): # Don't actually sleep in tests + with patch("time.sleep"): # Don't actually sleep in tests db = Database() # Verify pool was created assert db.pool is not None + def test_get_latest_pois(database, mock_conn): """Test fetching latest POI submissions.""" mock_cursor = MagicMock() mock_cursor.fetchall.return_value = [ ("poi1", "indexer1"), ("poi1", "indexer2"), - ("poi2", "indexer3") + ("poi2", "indexer3"), ] mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + result = database.get_latest_pois("deployment1", 1000) - + assert len(result) == 2 # Two unique POIs assert result["poi1"] == {"indexer1", "indexer2"} assert result["poi2"] == {"indexer3"} + def test_check_notification_sent(database, mock_conn): """Test checking if notification was already sent.""" mock_cursor = MagicMock() mock_cursor.fetchone.return_value = (True,) mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + result = database.check_notification_sent("deployment1", 1000) assert result is True + def test_record_notification(database, mock_conn): """Test recording a notification.""" mock_cursor = MagicMock() mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + database.record_notification("deployment1", 1000, "test message") - + # Verify the INSERT query was executed with correct parameters mock_cursor.execute.assert_any_call( """ @@ -92,23 +97,24 @@ def test_record_notification(database, mock_conn): SELECT %s, %s, %s, NOW(), c.poi_set::bytea[] FROM current_pois c """, - ("deployment1", 1000, "deployment1", 1000, "test message") + ("deployment1", 1000, "deployment1", 1000, "test message"), ) mock_conn.commit.assert_called_once() + def test_cleanup_old_notifications(database, mock_conn): """Test cleaning up old notifications.""" mock_cursor = MagicMock() mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + database.cleanup_old_notifications(days=30) - + # Verify the DELETE query was executed with correct parameters mock_cursor.execute.assert_any_call( """ DELETE FROM poi_notifications WHERE sent_at < NOW() - INTERVAL '%s days' """, - (30,) + (30,), ) - mock_conn.commit.assert_called_once() \ No newline at end of file + mock_conn.commit.assert_called_once() diff --git a/services/poi_monitor/tests/test_migration.py b/services/poi_monitor/tests/test_migration.py index 3160c7a..96c71fb 100644 --- a/services/poi_monitor/tests/test_migration.py +++ b/services/poi_monitor/tests/test_migration.py @@ -1,38 +1,41 @@ import pytest -from unittest.mock import Mock, MagicMock, patch +from unittest.mock import MagicMock from src.migration import MigrationManager -import os + @pytest.fixture def mock_conn(): return MagicMock() + @pytest.fixture def manager(mock_conn): return MigrationManager(mock_conn) + def test_ensure_migration_table(manager, mock_conn): """Test migration table creation.""" mock_cursor = MagicMock() mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + # Reset the mock to clear any previous calls mock_conn.commit.reset_mock() - + manager._ensure_migration_table() - + mock_cursor.execute.assert_called_once() mock_conn.commit.assert_called_once() + def test_get_applied_migrations(manager, mock_conn): """Test fetching applied migrations.""" mock_cursor = MagicMock() mock_cursor.fetchall.return_value = [ ("001_create_notifications_table",), - ("002_another_migration",) + ("002_another_migration",), ] mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + result = manager.get_applied_migrations() assert len(result) == 2 - assert "001_create_notifications_table" in result \ No newline at end of file + assert "001_create_notifications_table" in result diff --git a/services/poi_monitor/tests/test_monitor.py b/services/poi_monitor/tests/test_monitor.py index 875c9f2..3201f2d 100644 --- a/services/poi_monitor/tests/test_monitor.py +++ b/services/poi_monitor/tests/test_monitor.py @@ -5,65 +5,74 @@ from src.notification import SlackNotifier from src.analyzer import PoiAnalyzer + def test_main_initializes_components(): """Test that main properly initializes all components.""" - with patch('src.monitor.Database') as mock_db_class, \ - patch('src.monitor.SlackNotifier') as mock_notifier_class, \ - patch('src.monitor.PoiAnalyzer') as mock_analyzer_class, \ - patch('src.monitor.time.sleep', side_effect=KeyboardInterrupt): # Break the loop - + with patch("src.monitor.Database") as mock_db_class, patch( + "src.monitor.SlackNotifier" + ) as mock_notifier_class, patch( + "src.monitor.PoiAnalyzer" + ) as mock_analyzer_class, patch( + "src.monitor.time.sleep", side_effect=KeyboardInterrupt + ): # Break the loop # Setup mocks mock_db = Mock(spec=Database) mock_notifier = Mock(spec=SlackNotifier) mock_analyzer = Mock(spec=PoiAnalyzer) - + mock_db_class.return_value = mock_db mock_notifier_class.return_value = mock_notifier mock_analyzer_class.return_value = mock_analyzer - + # Run main (will be interrupted by KeyboardInterrupt) main() - + # Verify components were initialized mock_db_class.assert_called_once() mock_notifier_class.assert_called_once() mock_analyzer_class.assert_called_once_with(mock_db, mock_notifier) + def test_main_handles_initialization_error(): """Test that main properly handles initialization errors.""" - with patch('src.monitor.Database', side_effect=Exception("Test error")), \ - patch('src.monitor.logger') as mock_logger: - + with patch("src.monitor.Database", side_effect=Exception("Test error")), patch( + "src.monitor.logger" + ) as mock_logger: with pytest.raises(Exception): main() - + mock_logger.error.assert_called_once() + def test_main_processes_submissions(): """Test that main calls process_new_submissions.""" - with patch('src.monitor.Database') as mock_db_class, \ - patch('src.monitor.SlackNotifier') as mock_notifier_class, \ - patch('src.monitor.PoiAnalyzer') as mock_analyzer_class, \ - patch('src.monitor.time.sleep', side_effect=KeyboardInterrupt): # Stop after first run - + with patch("src.monitor.Database"), patch( + "src.monitor.SlackNotifier" + ), patch( + "src.monitor.PoiAnalyzer" + ) as mock_analyzer_class, patch( + "src.monitor.time.sleep", side_effect=KeyboardInterrupt + ): # Stop after first run # Setup mocks mock_analyzer = Mock(spec=PoiAnalyzer) mock_analyzer_class.return_value = mock_analyzer - + # Run main main() - + # Verify process_new_submissions was called assert mock_analyzer.process_new_submissions.call_count == 1 + def test_main_handles_keyboard_interrupt(): """Test that main handles keyboard interrupt gracefully.""" - with patch('src.monitor.Database') as mock_db_class, \ - patch('src.monitor.SlackNotifier') as mock_notifier_class, \ - patch('src.monitor.PoiAnalyzer') as mock_analyzer_class, \ - patch('src.monitor.time.sleep', side_effect=KeyboardInterrupt), \ - patch('src.monitor.logger') as mock_logger: - + with patch("src.monitor.Database"), patch( + "src.monitor.SlackNotifier" + ) as _, patch( + "src.monitor.PoiAnalyzer" + ) as _, patch( + "src.monitor.time.sleep", side_effect=KeyboardInterrupt + ), patch("src.monitor.logger") as mock_logger: main() - - mock_logger.info.assert_any_call("Shutting down POI Monitor service...") \ No newline at end of file + + mock_logger.info.assert_any_call("Shutting down POI Monitor service...") diff --git a/services/poi_monitor/tests/test_notification.py b/services/poi_monitor/tests/test_notification.py index bfe7791..470f596 100644 --- a/services/poi_monitor/tests/test_notification.py +++ b/services/poi_monitor/tests/test_notification.py @@ -1,49 +1,48 @@ import pytest -from unittest.mock import Mock, patch +from unittest.mock import patch from src.notification import SlackNotifier import requests + @pytest.fixture def notifier(): - with patch.dict('os.environ', {'SLACK_WEBHOOK_URL': 'http://test.url'}): + with patch.dict("os.environ", {"SLACK_WEBHOOK_URL": "http://test.url"}): return SlackNotifier() + def test_send_notification_success(notifier): """Test successful notification sending.""" - with patch('requests.post') as mock_post: + with patch("requests.post") as mock_post: mock_post.return_value.raise_for_status.return_value = None - + result = notifier.send_notification("test message") - + assert result is True mock_post.assert_called_once() + def test_send_notification_failure(notifier): """Test handling of notification failure.""" - with patch('requests.post') as mock_post: + with patch("requests.post") as mock_post: mock_post.side_effect = requests.exceptions.RequestException("Test error") - + result = notifier.send_notification("test message") - + assert result is False + def test_format_poi_discrepancy_message(notifier): """Test message formatting.""" data = { - 'deployment_cid': 'Qm123', - 'block_number': 1000, - 'submissions': { - 'poi1': {'indexer1', 'indexer2'}, - 'poi2': {'indexer3'} - }, - 'reuse_info': { - 'poi2': ['Previously used in deployment X'] - } + "deployment_cid": "Qm123", + "block_number": 1000, + "submissions": {"poi1": {"indexer1", "indexer2"}, "poi2": {"indexer3"}}, + "reuse_info": {"poi2": ["Previously used in deployment X"]}, } - + message = notifier.format_poi_discrepancy_message(data) - - assert '🚨' in message - assert 'Qm123' in message - assert 'indexer1' in message - assert 'Previously used in deployment X' in message \ No newline at end of file + + assert "🚨" in message + assert "Qm123" in message + assert "indexer1" in message + assert "Previously used in deployment X" in message From df6756b9dada18d3b9695624fefde4cae5b956db Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Fri, 10 Jan 2025 17:20:14 +0000 Subject: [PATCH 4/5] address CI failure --- .github/workflows/test.yml | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 09f5e7b..089fd38 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,13 +1,17 @@ -name: Tests +name: POI Monitor Tests on: push: + paths: + - 'services/poi_monitor/**' branches: [ main ] pull_request: + paths: + - 'services/poi_monitor/**' branches: [ main ] jobs: - test: + poi-monitor-test: runs-on: ubuntu-latest services: @@ -43,7 +47,8 @@ jobs: - name: Run tests run: | cd services/poi_monitor - pytest tests/ --cov=src --cov-report=xml + pip install -e . + pytest tests/ --cov=src --cov-report=term-missing env: POSTGRES_HOST: localhost POSTGRES_PORT: 5433 From b428df246bcfda93745d5ba968a8731ef4834df1 Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Sat, 11 Jan 2025 12:47:46 +0000 Subject: [PATCH 5/5] Remove compose/.env from git tracking --- .gitignore | 1 + compose/.env | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) delete mode 100644 compose/.env diff --git a/.gitignore b/.gitignore index 59ecb0b..efd1ede 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,7 @@ ENV/ # Environment variables .env +compose\.env # Coverage .coverage diff --git a/compose/.env b/compose/.env deleted file mode 100644 index 02ef633..0000000 --- a/compose/.env +++ /dev/null @@ -1 +0,0 @@ -GRAPHIX_DB_URL=postgresql://postgres:password@localhost:5433/graphix