diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..089fd38 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,62 @@ +name: POI Monitor Tests + +on: + push: + paths: + - 'services/poi_monitor/**' + branches: [ main ] + pull_request: + paths: + - 'services/poi_monitor/**' + branches: [ main ] + +jobs: + poi-monitor-test: + runs-on: ubuntu-latest + + services: + postgres: + image: postgres + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + POSTGRES_DB: graphix + ports: + - 5433:5433 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + cd services/poi_monitor + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-dev.txt + + - name: Run tests + run: | + cd services/poi_monitor + pip install -e . + pytest tests/ --cov=src --cov-report=term-missing + env: + POSTGRES_HOST: localhost + POSTGRES_PORT: 5433 + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + POSTGRES_DB: graphix + + - name: Upload coverage report + uses: codecov/codecov-action@v3 + with: + file: ./services/poi_monitor/coverage.xml \ No newline at end of file diff --git a/.gitignore b/.gitignore index f9ffaa5..efd1ede 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,65 @@ -rustc-ice* +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST +venv/ +.venv/ +ENV/ -### IDEs -.vscode/settings.json +# IDEs .idea/ +.vscode/ +*.swp +*.swo + +# Environment variables +.env +compose\.env + +# Coverage +.coverage +coverage.xml +htmlcov/ + +# Logs +*.log + +# Database +*.sqlite3 -### OS +# Rust +/target/ + +# OS specific .DS_Store +Thumbs.db + +# Debug files +rustc-ice* + +# Docker and Compose +compose/grafana/data/ +compose/data/ +compose/prometheus/data/ +*.db -### Rust -/target +# Environment variables +compose/*.env diff --git a/compose/.env b/compose/.env deleted file mode 100644 index 02ef633..0000000 --- a/compose/.env +++ /dev/null @@ -1 +0,0 @@ -GRAPHIX_DB_URL=postgresql://postgres:password@localhost:5433/graphix diff --git a/compose/.env.example b/compose/.env.example new file mode 100644 index 0000000..b9da60b --- /dev/null +++ b/compose/.env.example @@ -0,0 +1,8 @@ +GRAPHIX_DB_URL=postgresql://postgres:password@localhost:5433/graphix +POSTGRES_DB=graphix +POSTGRES_USER=postgres +POSTGRES_PASSWORD=password +POSTGRES_PORT=5433 +CHECK_INTERVAL=300 +GRAPHIX_API_URL=http://host.docker.internal:8000/graphql +SLACK_WEBHOOK_URL=your_webhook_url_here diff --git a/compose/dev.dependencies.yml b/compose/dev.dependencies.yml new file mode 100644 index 0000000..675213e --- /dev/null +++ b/compose/dev.dependencies.yml @@ -0,0 +1,40 @@ +version: "3" + +services: + postgres: + image: postgres + restart: unless-stopped + ports: + - "${POSTGRES_PORT:-5433}:${POSTGRES_PORT:-5433}" + environment: + POSTGRES_USER: ${POSTGRES_USER:-postgres} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-password} + POSTGRES_DB: ${POSTGRES_DB:-graphix} + PGDATA: "/var/lib/postgresql/data" + command: -p ${POSTGRES_PORT:-5433} + healthcheck: + test: ["CMD-SHELL", "pg_isready -h localhost -p ${POSTGRES_PORT:-5433} -U ${POSTGRES_USER:-postgres}"] + interval: 1s + timeout: 1s + retries: 1000 + volumes: + - ./data/postgres:/var/lib/postgresql/data + + poi-monitor: + build: + context: ../services/poi_monitor + dockerfile: Dockerfile + environment: + - POSTGRES_DB=${POSTGRES_DB:-graphix} + - POSTGRES_USER=${POSTGRES_USER:-postgres} + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-password} + - POSTGRES_HOST=postgres + - POSTGRES_PORT=${POSTGRES_PORT:-5433} + - GRAPHIX_API_URL=${GRAPHIX_API_URL:-http://host.docker.internal:8000/graphql} + - SLACK_WEBHOOK_URL=${SLACK_WEBHOOK_URL} + - CHECK_INTERVAL=${CHECK_INTERVAL:-300} + depends_on: + postgres: + condition: service_healthy + extra_hosts: + - "host.docker.internal:host-gateway" \ No newline at end of file diff --git a/services/poi_monitor/Dockerfile b/services/poi_monitor/Dockerfile new file mode 100644 index 0000000..6ca21ca --- /dev/null +++ b/services/poi_monitor/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install dependencies +COPY requirements.txt . +COPY requirements-dev.txt . +RUN pip install --no-cache-dir -r requirements.txt +RUN if [ "${ENVIRONMENT}" = "development" ]; then pip install --no-cache-dir -r requirements-dev.txt; fi + +# Copy application code +COPY src/ src/ +COPY migrations/ migrations/ + +# Set environment variables +ENV PYTHONUNBUFFERED=1 + +# Run the monitor +CMD ["python", "-m", "src.monitor"] diff --git a/services/poi_monitor/README.md b/services/poi_monitor/README.md new file mode 100644 index 0000000..3a6d1e4 --- /dev/null +++ b/services/poi_monitor/README.md @@ -0,0 +1,205 @@ +# POI Monitor Service + +## Description + +A service that monitors Proof of Indexing (POI) submissions across indexers in The Graph network, +detecting discrepancies and POI reuse and forwarding notifications to Slack when an issue is detected. + +## Features + +- Monitors POI submissions in real-time +- Detects when indexers submit different POIs for the same deployment/block +- Identifies POI reuse across different deployments/blocks +- Sends notifications to Slack when issues are found +- Prevents duplicate notifications for known issues +- Uses connection pooling for efficient database access +- Handles service restarts gracefully + +## Setup + +### Prerequisites + +- Python 3.12+ +- PostgreSQL +- Docker (for containerized deployment) +- Slack webhook URL + +### Environment Variables + +Create a `.env` file with: +``` +POSTGRES_DB=graphix +POSTGRES_USER=postgres +POSTGRES_PASSWORD=password +POSTGRES_HOST=localhost +POSTGRES_PORT=5433 +GRAPHIX_API_URL=http://localhost:8000/graphql +SLACK_WEBHOOK_URL=your_webhook_url_here # Required to run the service for notifications in Slack +CHECK_INTERVAL=300 # seconds between checks +``` + +### Local Development + +1. Create a virtual environment: +```bash +# Create and activate virtual environment +python -m venv venv +# For Git Bash on Windows: +source venv/Scripts/activate +# For Linux/Mac: +source venv/bin/activate +``` + +2. Install dependencies: +```bash +# With virtual environment activated: +pip install -r services/poi_monitor/requirements.txt +pip install -r services/poi_monitor/requirements-dev.txt # For development/testing +# Install package in editable mode (for development tools like pytest, mypy, etc.) +pip install -e services/poi_monitor +``` + +## Running the Service + +### Using Docker Compose + +The POI monitor requires several steps to run: + +```bash +# 1. Start postgres database +docker-compose -f compose/dev.dependencies.yml up -d postgres +# pgAdmin 4 should now be able to connect to the database +# localhost:8000/graphql should not be accessible yet + +# 2. Before we start the graphix service, we need to set the database URL: +export GRAPHIX_DB_URL=postgresql://postgres:password@localhost:5433/graphix + +# 3. Build and start the graphix service +cargo build +./target/debug/graphix --database-url postgresql://postgres:password@localhost:5433/graphix +# The graphix service should now be accessible at http://localhost:8000/graphql + +# 4. In a separate Git Bash terminal, build and start the POI monitor: +docker-compose -f compose/dev.dependencies.yml build poi-monitor # Build the POI monitor service +docker-compose -f compose/dev.dependencies.yml up poi-monitor # Start the POI monitor service +``` + +### Running Tests + +```bash +pytest services/poi_monitor/tests/ --cov=services/poi_monitor/src --cov-report=term-missing +``` + +### Project Structure +``` +graphix/ # Root project directory +├── Dockerfile # Main graphix service Dockerfile +├── compose/ +│ ├── dependencies.yml # Base services configuration +│ ├── dev.dependencies.yml # POI monitor service configuration +│ └── .env # Environment variables for Docker +└── services/ + └── poi_monitor/ # POI Monitor service + ├── Dockerfile # POI monitor service Dockerfile + ├── src/ # Source code + ├── tests/ # Test files + ├── migrations/ # Database migrations + └── .env # Local environment variables +``` + +## Restarting the Service + +Note: After making code changes, rebuild and restart the service: +```bash +docker-compose -f compose/dev.dependencies.yml build poi-monitor +docker-compose -f compose/dev.dependencies.yml up poi-monitor +``` + +For restarting without code changes (e.g., after updating environment variables): +```bash +docker-compose -f compose/dev.dependencies.yml restart poi-monitor +``` + +## Notifications + +When a POI discrepancy is detected, a Slack message is sent with: +- Deployment CID and block number +- Different POI hashes submitted +- Which indexers submitted each POI +- Any instances of POI reuse from other deployments + +Example notification: +``` +🚨 *POI Discrepancy Detected* +*Deployment:* QmYyB6sr2366Vw2mcWBXy2pTwqJKNkqyZnmxPeQJGHyXav +*Block:* 1234567 + +*Submitted POIs:* +• Hash: 0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef +*Submitted by:* 0x1234567890abcdef1234567890abcdef12345678 + +• Hash: 0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890 +Previously used 5 days ago: +• Network: mainnet +• Deployment: QmXyz789abcdef1234567890abcdef1234567890abcdef1234567890abcd +• Block: 1234000 +• Indexer: 0xabcdef1234567890abcdef1234567890abcdef1234 +``` + +### Technical Implementation +The POI monitor service compares POIs at the deployment level, where POIs for each deployment are only compared +when they share the same block height. This means we can effectively compare POIs across different networks, even +when their chain heads are at vastly different heights (e.g., Arbitrum at block 15M vs Ethereum at block 5M). +What matters is that for any given deployment, all POIs being compared were generated at the same block height +for that specific deployment. + +1. Deployment-Scoped Comparisons: + ```python + def analyze_pois(self, deployment_id: str, block_number: int): + # POIs are only compared within the same [deployment, block-height] combination + ``` + +2. Independent Block Heights: + - Each deployment is processed independently + - Block numbers are compared within the same deployment + - Different networks (e.g., Arbitrum at block 15M vs Ethereum at block 5M) + are handled correctly because their deployments are different + +3. POI Reuse Detection: + - Checks for POI reuse by comparing POI hashes across all deployments + - Does not explicitly differentiate between networks + - Deployment IDs are naturally network-specific because they include: + - The network configuration in the manifest + - Network-specific data sources + - Other chain-specific parameters + +### Limitations + +The POI monitor service focuses on the most recent POI submissions and has some limitations regarding historical data: + +- The service processes POIs at the latest block heights it finds for each deployment. +- If multiple discrepancies exist across different block heights, only recent ones will be detected +- Therefore historical discrepancies may be missed if they occurred during service downtime + +For example: +- The service was live, running during blocks 0 to 220,000,000 +- The servce goes offline for some large number of blocks. e.g. 100,000,000 blocks +- If POI discrepancies may exist at blocks: + - 230,000,000 + - 240,000,000 + - 250,000,000 + ... + - 310,000,000 + - 320,000,000 +- The service will only detect and notify about discrepancies at the latest blocks (e.g., 310M, 320M). +- Historical discrepancies at blocks 230M, 240M, 250M, etc. will not be detected + +This behavior is acceptable for normal operation because: +1. Epochs typically last 24 hours +2. The service checks for discrepancies every few minutes (configurable via CHECK_INTERVAL) +3. Under normal operation, no discrepancies should be missed + +However, developers should be aware that: +- If the service experiences downtime +- And POI discrepancies occur during that downtime +- Those historical discrepancies might not be detected when service resumes diff --git a/services/poi_monitor/migrations/001_create_notifications_table.py b/services/poi_monitor/migrations/001_create_notifications_table.py new file mode 100644 index 0000000..ca63f14 --- /dev/null +++ b/services/poi_monitor/migrations/001_create_notifications_table.py @@ -0,0 +1,51 @@ +import logging +from typing import Optional +import psycopg2 + +logger = logging.getLogger(__name__) + + +def migrate_up(conn: Optional[psycopg2.extensions.connection] = None) -> None: + """Create the poi_notifications table.""" + + up_sql = """ + CREATE TABLE IF NOT EXISTS poi_notifications ( + id SERIAL PRIMARY KEY, + deployment_id TEXT NOT NULL, + block_number BIGINT NOT NULL, + message TEXT NOT NULL, + sent_at TIMESTAMP NOT NULL DEFAULT NOW(), + UNIQUE(deployment_id, block_number) + ); + + CREATE INDEX IF NOT EXISTS idx_poi_notifications_sent_at + ON poi_notifications(sent_at); + """ + + try: + with conn.cursor() as cur: + cur.execute(up_sql) + conn.commit() + logger.info("Successfully created poi_notifications table") + except Exception as e: + conn.rollback() + logger.error(f"Failed to create poi_notifications table: {str(e)}") + raise + + +def migrate_down(conn: Optional[psycopg2.extensions.connection] = None) -> None: + """Remove the poi_notifications table.""" + + down_sql = """ + DROP TABLE IF EXISTS poi_notifications; + """ + + try: + with conn.cursor() as cur: + cur.execute(down_sql) + conn.commit() + logger.info("Successfully dropped poi_notifications table") + except Exception as e: + conn.rollback() + logger.error(f"Failed to drop poi_notifications table: {str(e)}") + raise diff --git a/services/poi_monitor/migrations/003_add_poi_set_column.sql b/services/poi_monitor/migrations/003_add_poi_set_column.sql new file mode 100644 index 0000000..0a50365 --- /dev/null +++ b/services/poi_monitor/migrations/003_add_poi_set_column.sql @@ -0,0 +1,12 @@ +-- Add poi_set column +ALTER TABLE poi_notifications +ADD COLUMN IF NOT EXISTS poi_set BYTEA[]; + +-- Update existing rows with empty array +UPDATE poi_notifications +SET poi_set = ARRAY[]::bytea[] +WHERE poi_set IS NULL; + +-- Make column not null after filling defaults +ALTER TABLE poi_notifications +ALTER COLUMN poi_set SET NOT NULL; \ No newline at end of file diff --git a/services/poi_monitor/requirements-dev.txt b/services/poi_monitor/requirements-dev.txt new file mode 100644 index 0000000..ca770e4 --- /dev/null +++ b/services/poi_monitor/requirements-dev.txt @@ -0,0 +1,4 @@ +pytest==8.0.0 +pytest-cov==4.1.0 +pytest-mock==3.12.0 +ruff==0.3.0 \ No newline at end of file diff --git a/services/poi_monitor/requirements.txt b/services/poi_monitor/requirements.txt new file mode 100644 index 0000000..759d70b --- /dev/null +++ b/services/poi_monitor/requirements.txt @@ -0,0 +1,6 @@ +requests==2.31.0 +python-dotenv==1.0.0 +psycopg2-binary==2.9.9 +prometheus-client==0.19.0 +pyyaml==6.0.1 +python-json-logger==2.0.7 diff --git a/services/poi_monitor/setup.py b/services/poi_monitor/setup.py new file mode 100644 index 0000000..b0584c8 --- /dev/null +++ b/services/poi_monitor/setup.py @@ -0,0 +1,15 @@ +from setuptools import setup, find_packages + +setup( + name="poi_monitor", + version="0.1", + packages=find_packages(), + install_requires=[ + "requests", + "python-dotenv", + "psycopg2-binary", + "prometheus-client", + "pyyaml", + "python-json-logger", + ], +) diff --git a/services/poi_monitor/src/__init__.py b/services/poi_monitor/src/__init__.py new file mode 100644 index 0000000..7ff37fb --- /dev/null +++ b/services/poi_monitor/src/__init__.py @@ -0,0 +1,21 @@ +""" +POI Monitor Service + +A service that monitors Proof of Indexing (POI) submissions across indexers in The Graph network, +detecting discrepancies and POI reuse. + +Components: +- analyzer: POI discrepancy detection +- database: PostgreSQL connection management +- notification: Slack integration +- monitor: Main service loop +- migration: Database schema management +""" + +from .analyzer import PoiAnalyzer +from .database import Database +from .notification import SlackNotifier +from .monitor import main + +__version__ = "0.1.0" +__all__ = ["PoiAnalyzer", "Database", "SlackNotifier", "main"] diff --git a/services/poi_monitor/src/analyzer.py b/services/poi_monitor/src/analyzer.py new file mode 100644 index 0000000..9a96ac2 --- /dev/null +++ b/services/poi_monitor/src/analyzer.py @@ -0,0 +1,308 @@ +# Import external libraries +import logging +from typing import Dict, Set, List, Optional +import os +import requests + +# Import internal modules +from .database import Database +from .notification import SlackNotifier + +# Configure logging +logger = logging.getLogger(__name__) + + +class PoiAnalyzer: + """Class to analyze POI submissions and notify slack if discrepancies are found.""" + + def __init__(self, database: Database, notifier: SlackNotifier): + """Initialize the POI analyzer with a database and notifier. + These are internal modules that are injected into the class.""" + self.db = database + self.notifier = notifier + self.page_size = 100 # Default page size for pagination + + def analyze_pois(self, deployment_id: str, block_number: int) -> Optional[Dict]: + """Analyze POI submissions and detect discrepancies between indexers. + + This method checks if different indexers have submitted different POI (Proof of Indexing) + hashes for the same deployment and block. A discrepancy indicates that indexers + disagree about the correct POI value. + + Params: + deployment_id: The deployment CID to analyze + block_number: The blockchain block number to analyze + + Returns: + Optional[Dict]: Returns None if: + - Notification was already sent + - No POI submissions exist + - Only one unique POI hash exists (no discrepancy) + + Returns a dictionary containing discrepancy details if multiple different + POI hashes are found, with structure: + { + 'deployment_cid': str, + 'block_number': int, + 'submissions': Dict[str, Set[str]], # POI hash -> set of indexer addresses + 'reuse_info': Dict[str, List[str]] # POI reuse history + } + + Raises: + Exception: If there's an error accessing the database or processing submissions + """ + try: + # Skip if we've already notified about this deployment/block + if self.db.check_notification_sent(deployment_id, block_number): + logger.debug( + f"Already notified about {deployment_id} at block {block_number}" + ) + return None + + # Get all POI submissions for this deployment/block + poi_submissions = self.db.get_latest_pois(deployment_id, block_number) + + if not poi_submissions: + logger.debug( + f"No POI submissions found for {deployment_id} at block {block_number}" + ) + return None + + # If there's only one POI hash, there's no discrepancy + if len(poi_submissions) == 1: + logger.debug( + f"All indexers agree on POI for {deployment_id} at block {block_number}" + ) + return None + + # We have a discrepancy - format the data + discrepancy_data = { + "deployment_cid": deployment_id, + "block_number": block_number, + "submissions": poi_submissions, + "reuse_info": self._check_poi_reuse(poi_submissions), + } + + return discrepancy_data + + except Exception as e: + logger.error(f"Error analyzing POIs: {str(e)}", exc_info=True) + raise + + def _check_poi_reuse( + self, submissions: Dict[str, Set[str]] + ) -> Dict[str, List[str]]: + """Check if any POIs have been reused from other blocks/deployments. + + Params: + submissions: Dictionary mapping POI hashes to sets of indexer addresses + + Returns: + Dict mapping POI hashes to lists of reuse information + """ + reuse_info = {} + + query = """ + SELECT + p.poi, + d.ipfs_cid as deployment_id, + b.number as block_number, + i.address as indexer_address, + n.name as network_name, + p.created_at as submission_time + FROM pois p + JOIN sg_deployments d ON d.id = p.sg_deployment_id + JOIN blocks b ON b.id = p.block_id + JOIN indexers i ON i.id = p.indexer_id + JOIN networks n ON n.id = d.network + WHERE p.poi = ANY(%s) + ORDER BY p.created_at DESC + """ + + try: + with self.db.get_connection() as conn: + with conn.cursor() as cur: + poi_hashes = list(submissions.keys()) + cur.execute(query, (poi_hashes,)) + results = cur.fetchall() + + # Group results by POI hash + poi_occurrences = {} + for row in results: + ( + poi_hash, + deployment_id, + block_number, + indexer_addr, + network, + timestamp, + ) = row + if poi_hash not in poi_occurrences: + poi_occurrences[poi_hash] = [] + poi_occurrences[poi_hash].append( + { + "deployment_id": deployment_id, + "block_number": block_number, + "indexer_address": indexer_addr, + "network": network, + "timestamp": timestamp, + } + ) + + # Format detailed reuse information + for poi_hash, occurrences in poi_occurrences.items(): + if len(occurrences) > 1: # POI appears more than once + reuse_info[poi_hash] = [] + + # Sort by timestamp descending + occurrences.sort(key=lambda x: x["timestamp"], reverse=True) + + # First occurrence is current + current = occurrences[0] + + # Add details for each previous use + for prev in occurrences[1:]: + time_diff = current["timestamp"] - prev["timestamp"] + days_ago = time_diff.days + + reuse_info[poi_hash].append( + f"Previously used {days_ago} days ago:\n" + f"• Network: {prev['network']}\n" + f"• Deployment: {prev['deployment_id']}\n" + f"• Block: {prev['block_number']}\n" + f"• Indexer: {prev['indexer_address']}" + ) + + return reuse_info + + except Exception as e: + logger.error(f"Error checking POI reuse: {str(e)}", exc_info=True) + return {} + + def process_new_submissions(self) -> None: + """Process any new POI submissions and send notifications for discrepancies.""" + try: + recent_submissions = self._get_recent_submissions() + + for deployment_id, block_number in recent_submissions: + try: + discrepancy = self.analyze_pois(deployment_id, block_number) + + if discrepancy: + # Format and send notification + message = self.notifier.format_poi_discrepancy_message( + discrepancy + ) + if self.notifier.send_notification(message): + # Record that we sent the notification + self.db.record_notification( + deployment_id, block_number, message + ) + except Exception as e: + # Log the error but continue processing other submissions + logger.error( + f"Error processing submission for deployment {deployment_id} at block {block_number}: {str(e)}", + exc_info=True, + ) + continue + + # Cleanup old POI notifications from the database + self.db.cleanup_old_notifications(days=60) + + except Exception as e: + logger.error(f"Error processing submissions: {str(e)}", exc_info=True) + raise + + def _get_recent_submissions(self) -> List[tuple[str, int]]: + """Get list of recent deployment/block combinations to check.""" + graphql_url = os.getenv("GRAPHIX_API_URL", "http://localhost:8000/graphql") + submissions = set() + + query = """ + query { + poiAgreementRatios( + indexerAddress: "%s" + ) { + poi { + hash + block { + number + } + deployment { + cid + } + indexer { + address + } + } + } + } + """ + + try: + indexers = self._get_indexers() + if not indexers: + logger.error("No indexers found") + return [] + + for indexer_address in indexers: + logger.debug(f"Fetching POIs for indexer {indexer_address}") + current_query = query % indexer_address + + response = requests.post( + graphql_url, json={"query": current_query}, timeout=10 + ) + response.raise_for_status() + data = response.json() + + if "errors" in data: + logger.error(f"GraphQL errors: {data['errors']}") + break + + if "data" not in data or "poiAgreementRatios" not in data["data"]: + logger.error("Unexpected GraphQL response format") + break + + # Extract POIs from current page + agreements = data["data"]["poiAgreementRatios"] + for agreement in agreements: + submissions.add( + ( + agreement["poi"]["deployment"]["cid"], + agreement["poi"]["block"]["number"], + ) + ) + + return list(submissions) + + except requests.exceptions.RequestException as e: + logger.error(f"Failed to fetch recent submissions: {str(e)}") + return [] + + def _get_indexers(self) -> List[str]: + """Get list of indexer addresses.""" + query = """ + query { + indexers( + limit: %d + ) { + address + } + } + """ + try: + all_indexers = [] + current_query = query % self.page_size + logger.debug(f"Fetching indexers with limit {self.page_size}") + response = requests.post( + os.getenv("GRAPHIX_API_URL"), json={"query": current_query}, timeout=10 + ) + data = response.json() + if "data" in data and "indexers" in data["data"]: + indexers = data["data"]["indexers"] + all_indexers.extend([indexer["address"] for indexer in indexers]) + return all_indexers + return [] + except Exception as e: + logger.error(f"Error getting indexers: {str(e)}") + return [] diff --git a/services/poi_monitor/src/database.py b/services/poi_monitor/src/database.py new file mode 100644 index 0000000..df4a4b0 --- /dev/null +++ b/services/poi_monitor/src/database.py @@ -0,0 +1,203 @@ +import os +import logging +from typing import Dict, Set +from psycopg2.pool import SimpleConnectionPool +from contextlib import contextmanager +from dotenv import load_dotenv + +logger = logging.getLogger(__name__) +load_dotenv() + + +class Database: + """ + This class is the Database manager for POI monitoring. + + This class manages database connections and provides methods for: + - Tracking POI submissions across indexers + - Managing slack notification history + - Handling database migrations (schema updates) + """ + + def __init__(self): + """ + Constructor initializes database connection and runs migrations. + + Raises: + psycopg2.Error: If database connection cannot be established + Exception: If migrations fail to apply + """ + # Create the connection pool first + self.pool = SimpleConnectionPool( + minconn=1, + maxconn=10, + dbname=os.getenv("POSTGRES_DB", "graphix"), + user=os.getenv("POSTGRES_USER", "postgres"), + password=os.getenv("POSTGRES_PASSWORD", "password"), + host=os.getenv("POSTGRES_HOST", "localhost"), + port=os.getenv("POSTGRES_PORT", "5433"), + ) + + # Get initial connection for migrations + with self.get_connection() as conn: + self.conn = conn # Store temporary reference for migrations + self._run_migrations() + self.conn = None # Remove reference after migrations + + @contextmanager + def get_connection(self): + """ + Get a database connection from the pool with automatic cleanup (this is faster than creating a + new connetion each time we want to talk to the db) + + - Automatically returns connection to pool after use + - Handles cleanup even if an exception occurs + """ + conn = self.pool.getconn() + try: + yield conn + finally: + self.pool.putconn(conn) + + def get_latest_pois( + self, deployment_id: str, block_number: int + ) -> Dict[str, Set[str]]: + """Fetch all indexer POI submissions for a specific deployment and block. + + Retrieves all POI submissions and the indexers that submitted them for + a given deployment at a specific block number. + + Params: + deployment_id: The IPFS CID of the subgraph deployment + block_number: The block number to verify POIs against + + Returns: + A dictionary mapping POI hashes to sets of indexer addresses. + Example: + { + "0xabc...": {"0x123...", "0x456..."}, + "0xdef...": {"0x789..."} + } + """ + query = """ + SELECT p.poi, i.address + FROM pois p + JOIN indexers i ON i.id = p.indexer_id + JOIN blocks b ON b.id = p.block_id + JOIN sg_deployments d ON d.id = p.sg_deployment_id + WHERE d.ipfs_cid = %s AND b.number = %s + """ + + with self.get_connection() as conn: + with conn.cursor() as cur: + cur.execute(query, (deployment_id, block_number)) + results = cur.fetchall() + + poi_submissions = {} + for poi_hash, indexer_addr in results: + if poi_hash not in poi_submissions: + poi_submissions[poi_hash] = set() + poi_submissions[poi_hash].add(indexer_addr) + + return poi_submissions + + def check_notification_sent(self, deployment_id: str, block_number: int) -> bool: + """Check if we've already notified about the current set of POIs for this deployment/block. + + Params: + deployment_id: The deployment CID + block_number: The block number + + Returns: + bool: True if we've already notified about these exact POIs + """ + query = """ + WITH current_pois AS ( + SELECT array_agg(DISTINCT p.poi ORDER BY p.poi) as poi_set + FROM pois p + JOIN blocks b ON b.id = p.block_id + JOIN sg_deployments d ON d.id = p.sg_deployment_id + WHERE d.ipfs_cid = %s AND b.number = %s + ) + SELECT EXISTS ( + SELECT 1 + FROM poi_notifications n + CROSS JOIN current_pois c + WHERE n.deployment_id = %s + AND n.block_number = %s + AND n.poi_set = c.poi_set + ) + """ + + with self.get_connection() as conn: + with conn.cursor() as cur: + cur.execute( + query, (deployment_id, block_number, deployment_id, block_number) + ) + return cur.fetchone()[0] + + def record_notification( + self, deployment_id: str, block_number: int, message: str + ) -> None: + """Record that a notification was sent. Later used to prevent duplicate notifications/spam. + + Params: + deployment_id: The deployment IPFS hash + block_number: The block number + message: The notification message that was sent + """ + + query = """ + WITH current_pois AS ( + SELECT array_agg(DISTINCT p.poi ORDER BY p.poi) as poi_set + FROM pois p + JOIN blocks b ON b.id = p.block_id + JOIN sg_deployments d ON d.id = p.sg_deployment_id + WHERE d.ipfs_cid = %s AND b.number = %s + ) + INSERT INTO poi_notifications (deployment_id, block_number, message, sent_at, poi_set) + SELECT %s, %s, %s, NOW(), c.poi_set::bytea[] + FROM current_pois c + """ + + with self.get_connection() as conn: + with conn.cursor() as cur: + cur.execute( + query, + (deployment_id, block_number, deployment_id, block_number, message), + ) + conn.commit() + + def cleanup_old_notifications(self, days: int = 60) -> None: + """Remove notification records older than specified days.""" + query = """ + DELETE FROM poi_notifications + WHERE sent_at < NOW() - INTERVAL '%s days' + """ + + with self.get_connection() as conn: + with conn.cursor() as cur: + cur.execute(query, (days,)) + conn.commit() + + def _run_migrations(self): + """Run any pending database migrations.""" + from .migration import MigrationManager + + try: + manager = MigrationManager(self.conn) + manager.apply_migrations() + + # Verify table structure + with self.conn.cursor() as cur: + cur.execute(""" + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = 'poi_notifications' + """) + columns = cur.fetchall() + logger.info(f"poi_notifications table structure: {columns}") + + except Exception as e: + logger.error(f"Failed to run migrations: {str(e)}") + raise diff --git a/services/poi_monitor/src/migration.py b/services/poi_monitor/src/migration.py new file mode 100644 index 0000000..926e954 --- /dev/null +++ b/services/poi_monitor/src/migration.py @@ -0,0 +1,86 @@ +import os +import logging +import importlib +from typing import List +import psycopg2 + +logger = logging.getLogger(__name__) + + +class MigrationManager: + def __init__(self, conn: psycopg2.extensions.connection): + self.conn = conn + self._ensure_migration_table() + + def _ensure_migration_table(self): + """Create the migrations tracking table if it doesn't exist.""" + sql = """ + CREATE TABLE IF NOT EXISTS poi_monitor_migrations ( + id SERIAL PRIMARY KEY, + migration_name TEXT NOT NULL UNIQUE, + applied_at TIMESTAMP NOT NULL DEFAULT NOW() + ); + """ + with self.conn.cursor() as cur: + cur.execute(sql) + self.conn.commit() + + def get_applied_migrations(self) -> List[str]: + """Get list of already applied migrations.""" + sql = "SELECT migration_name FROM poi_monitor_migrations ORDER BY id;" + with self.conn.cursor() as cur: + cur.execute(sql) + return [row[0] for row in cur.fetchall()] + + def apply_migrations(self): + """Apply all pending migrations.""" + applied = set(self.get_applied_migrations()) + + # Get all migration files + migrations_dir = os.path.join(os.path.dirname(__file__), "..", "migrations") + logger.info(f"Looking for migrations in: {migrations_dir}") + migration_files = sorted( + [ + f + for f in os.listdir(migrations_dir) + if (f.endswith(".py") and f != "__init__.py") or f.endswith(".sql") + ] + ) + logger.info(f"Found migration files: {migration_files}") + + for migration_file in migration_files: + migration_name = migration_file[:-3] + + if migration_name in applied: + logger.info(f"Skipping already applied migration: {migration_name}") + continue + + logger.info(f"Applying migration: {migration_name}") + + try: + if migration_file.endswith(".sql"): + # Handle SQL files + with open(os.path.join(migrations_dir, migration_file)) as f: + sql = f.read() + with self.conn.cursor() as cur: + cur.execute(sql) + self.conn.commit() + else: + # Import and run Python migrations + module = importlib.import_module(f"migrations.{migration_name}") + module.migrate_up(self.conn) + + # Record the migration + with self.conn.cursor() as cur: + cur.execute( + "INSERT INTO poi_monitor_migrations (migration_name) VALUES (%s)", + (migration_name,), + ) + self.conn.commit() + + logger.info(f"Successfully applied migration: {migration_name}") + + except Exception as e: + self.conn.rollback() + logger.error(f"Failed to apply migration {migration_name}: {str(e)}") + raise diff --git a/services/poi_monitor/src/monitor.py b/services/poi_monitor/src/monitor.py new file mode 100644 index 0000000..9b6d3cd --- /dev/null +++ b/services/poi_monitor/src/monitor.py @@ -0,0 +1,58 @@ +import os +import time +import logging +from logging.config import dictConfig +from dotenv import load_dotenv +from .database import Database +from .notification import SlackNotifier +from .analyzer import PoiAnalyzer + +# Load environment variables +load_dotenv() + +# Configure logging +logging_config = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "json": { + "()": "pythonjsonlogger.jsonlogger.JsonFormatter", + "format": "%(asctime)s %(levelname)s %(name)s %(message)s", + } + }, + "handlers": {"console": {"class": "logging.StreamHandler", "formatter": "json"}}, + "root": {"handlers": ["console"], "level": "INFO"}, +} + +dictConfig(logging_config) +logger = logging.getLogger(__name__) + + +def main(): + logger.info("Starting POI Monitor service...") + + # Initialize components + try: + db = Database() + notifier = SlackNotifier() + analyzer = PoiAnalyzer(db, notifier) + logger.info("Successfully initialized all components") + except Exception as e: + logger.error(f"Failed to initialize components: {str(e)}") + raise + + try: + while True: + logger.info("Running POI check iteration") + analyzer.process_new_submissions() + time.sleep(int(os.getenv("CHECK_INTERVAL", 300))) # Default 5 minutes + + except KeyboardInterrupt: + logger.info("Shutting down POI Monitor service...") + except Exception as e: + logger.error(f"Unexpected error: {str(e)}", exc_info=True) + raise + + +if __name__ == "__main__": + main() diff --git a/services/poi_monitor/src/notification.py b/services/poi_monitor/src/notification.py new file mode 100644 index 0000000..dfeb833 --- /dev/null +++ b/services/poi_monitor/src/notification.py @@ -0,0 +1,91 @@ +import os +import logging +import requests +from typing import Dict, Any + +logger = logging.getLogger(__name__) + + +class SlackNotifier: + """This class is used to send notifications to a Slack channel.""" + + def __init__(self, webhook_url: str = None): + """ + Initializes the SlackNotifier with a webhook URL. + + Params: + webhook_url: The Slack webhook URL + + Raises: + ValueError: If the Slack webhook URL is not provided + """ + self.webhook_url = webhook_url or os.getenv("SLACK_WEBHOOK_URL") + if not self.webhook_url: + raise ValueError("Slack webhook URL not provided") + + def send_notification(self, message: str) -> bool: + """Send a notification to Slack. + + Params: + message: The formatted message to send + + Returns: + bool: True if the message was sent successfully + """ + try: + response = requests.post( + self.webhook_url, json={"text": message}, timeout=10 + ) + response.raise_for_status() + logger.info("Successfully sent Slack notification") + return True + + except requests.exceptions.RequestException as e: + logger.error(f"Failed to send Slack notification: {str(e)}") + return False + + def format_poi_discrepancy_message(self, data: Dict[str, Any]) -> str: + """Format POI discrepancy data into a Slack message. + + Params: + data: Dictionary containing POI discrepancy information + + Returns: + str: Formatted message ready to send to Slack + """ + message_parts = [ + "🚨 *New POI Discrepancy Found*", + f"*Deployment:* `{data['deployment_cid']}`", + f"*Block:* `{data['block_number']}`", + "*POI Submissions:*", + ] + + for poi_hash, indexers in data["submissions"].items(): + # Convert memoryview to hex string if needed + if isinstance(poi_hash, memoryview): + poi_hash = poi_hash.hex() + + submission_parts = [f"*POI Hash:* `{poi_hash}`"] + + # Add POI reuse information if available + if "reuse_info" in data and poi_hash in data["reuse_info"]: + reuse_data = data["reuse_info"][poi_hash] + submission_parts.append("⚠️ *POI Reuse:*") + for detail in reuse_data: + submission_parts.append(f" • {detail}") + + # Convert indexer addresses if they're memoryview + formatted_indexers = [] + for indexer in indexers: + if isinstance(indexer, memoryview): + formatted_indexers.append(indexer.hex()) + else: + formatted_indexers.append(str(indexer)) + + submission_parts.append( + f"*Submitted by:* `{', '.join(sorted(formatted_indexers))}`" + ) + message_parts.extend(submission_parts) + message_parts.append("") # Add spacing between submissions + + return "\n".join(message_parts) diff --git a/services/poi_monitor/tests/test_analyzer.py b/services/poi_monitor/tests/test_analyzer.py new file mode 100644 index 0000000..492565c --- /dev/null +++ b/services/poi_monitor/tests/test_analyzer.py @@ -0,0 +1,178 @@ +import pytest +from datetime import datetime +from unittest.mock import Mock, patch, MagicMock +from src.analyzer import PoiAnalyzer +from src.database import Database +from src.notification import SlackNotifier +import requests + + +@pytest.fixture +def mock_db(): + db = Mock(spec=Database) + # Create a context manager mock + context_mock = MagicMock() + mock_conn = MagicMock() + mock_cursor = MagicMock() + + # Set up the context manager chain + db.get_connection.return_value = context_mock + context_mock.__enter__.return_value = mock_conn + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + # Store cursor on db for tests that need it + db._test_cursor = mock_cursor + return db # Return just the db, not the tuple + + +@pytest.fixture +def mock_notifier(): + return Mock(spec=SlackNotifier) + + +@pytest.fixture +def analyzer(mock_db, mock_notifier): + return PoiAnalyzer(mock_db, mock_notifier) + + +def test_analyze_pois_no_discrepancy(analyzer, mock_db): + # Setup + deployment_id = "Qm123" + block_number = 1000 + + # Mock database responses + mock_db.check_notification_sent.return_value = False + mock_db.get_latest_pois.return_value = { + "poi_hash_1": {"indexer1", "indexer2"} # Single POI hash = no discrepancy + } + + # Execute + result = analyzer.analyze_pois(deployment_id, block_number) + + # Assert + assert result is None + mock_db.check_notification_sent.assert_called_once_with(deployment_id, block_number) + mock_db.get_latest_pois.assert_called_once_with(deployment_id, block_number) + + +def test_analyze_pois_with_discrepancy(analyzer, mock_db): + # Setup + deployment_id = "Qm123" + block_number = 1000 + + # Mock database responses + mock_db.check_notification_sent.return_value = False + mock_db.get_latest_pois.return_value = { + "poi_hash_1": {"indexer1"}, + "poi_hash_2": {"indexer2"}, # Two different POI hashes = discrepancy + } + + # Execute + result = analyzer.analyze_pois(deployment_id, block_number) + + # Assert + assert result is not None + assert result["deployment_cid"] == deployment_id + assert result["block_number"] == block_number + assert result["submissions"] == mock_db.get_latest_pois.return_value + + +def test_check_poi_reuse(analyzer): + """Test POI reuse detection.""" + # Setup + submissions = {"poi_hash_1": {"indexer1"}, "poi_hash_2": {"indexer2"}} + + # Mock database responses + mock_cursor = analyzer.db._test_cursor + mock_cursor.fetchall.return_value = [ + # Match the columns from the query: + # poi, deployment_id, block_number, indexer_address, network_name, submission_time + ("poi_hash_1", "deployment1", 1000, b"addr1", "mainnet", datetime.now()), + ("poi_hash_1", "deployment2", 900, b"addr2", "mainnet", datetime.now()), + ] + + # Execute + result = analyzer._check_poi_reuse(submissions) + + # Assert + assert "poi_hash_1" in result + assert len(result["poi_hash_1"]) == 1 + assert "Previously used" in result["poi_hash_1"][0] + + +def test_analyze_pois_already_notified(analyzer, mock_db): + """Test that we don't re-notify about known discrepancies.""" + deployment_id = "Qm123" + block_number = 1000 + + mock_db.check_notification_sent.return_value = True + + result = analyzer.analyze_pois(deployment_id, block_number) + assert result is None + mock_db.get_latest_pois.assert_not_called() + + +def test_analyze_pois_no_submissions(analyzer, mock_db): + """Test handling of blocks with no POI submissions.""" + deployment_id = "Qm123" + block_number = 1000 + + mock_db.check_notification_sent.return_value = False + mock_db.get_latest_pois.return_value = {} + + result = analyzer.analyze_pois(deployment_id, block_number) + assert result is None + + +def test_process_new_submissions_handles_errors(analyzer, mock_db): + """Test error handling in the main processing loop.""" + # Mock _get_recent_submissions to return some test data + analyzer._get_recent_submissions = Mock( + return_value=[("Qm123", 1000), ("Qm456", 2000)] + ) + + # Make analyze_pois raise an exception for the second submission + def mock_analyze(deployment_id, block_number): + if deployment_id == "Qm456": + raise Exception("Test error") + return None + + analyzer.analyze_pois = Mock(side_effect=mock_analyze) + mock_db.cleanup_old_notifications = Mock() + + # This should not raise an exception and should continue processing + analyzer.process_new_submissions() + + # Verify we tried to process both submissions + assert analyzer.analyze_pois.call_count == 2 + # Verify cleanup was still called + mock_db.cleanup_old_notifications.assert_called_once() + + +def test_get_recent_submissions_handles_api_errors(analyzer): + """Test handling of GraphQL API errors.""" + with patch("requests.post") as mock_post: + # Mock a failed API response + mock_post.side_effect = requests.exceptions.RequestException("API Error") + + result = analyzer._get_recent_submissions() + assert result == [] # Should return empty list on error + + +def test_check_poi_reuse_with_multiple_reuses(analyzer): + """Test POI reuse detection with multiple reuse patterns.""" + submissions = {"poi_hash_1": {"indexer1"}, "poi_hash_2": {"indexer2"}} + + now = datetime.now() + + # Mock database responses + mock_cursor = analyzer.db._test_cursor + mock_cursor.fetchall.return_value = [ + ("poi_hash_1", "deployment1", 1000, b"addr1", "mainnet", now), + ("poi_hash_1", "deployment2", 900, b"addr2", "mainnet", now), + ("poi_hash_2", "deployment1", 1000, b"addr1", "mainnet", now), + ("poi_hash_2", "deployment1", 950, b"addr1", "mainnet", now), + ] + + result = analyzer._check_poi_reuse(submissions) + assert len(result) == 2 # Both POIs were reused diff --git a/services/poi_monitor/tests/test_database.py b/services/poi_monitor/tests/test_database.py new file mode 100644 index 0000000..14cabd6 --- /dev/null +++ b/services/poi_monitor/tests/test_database.py @@ -0,0 +1,120 @@ +import pytest +from unittest.mock import Mock, MagicMock, patch +from src.database import Database +import psycopg2 + + +@pytest.fixture +def mock_conn(): + conn = MagicMock() + with patch("psycopg2.connect", return_value=conn): + yield conn + + +@pytest.fixture +def database(mock_conn): + """Create a Database instance with mocked connection.""" + with patch("psycopg2.pool.SimpleConnectionPool") as mock_pool: + # Mock the connection pool + pool = MagicMock() + pool.getconn.return_value = mock_conn + mock_pool.return_value = pool + + # Mock migrations + with patch("src.migration.MigrationManager") as mock_manager: + mock_manager_instance = Mock() + mock_manager.return_value = mock_manager_instance + + db = Database() + return db + + +def test_database_connection_retry(mock_conn): + """Test that database connection retries on failure.""" + with patch("psycopg2.pool.SimpleConnectionPool") as mock_pool: + # Make pool creation fail twice then succeed + mock_pool.side_effect = [ + psycopg2.Error("Test error"), + psycopg2.Error("Test error"), + MagicMock(), # Successful pool + ] + + # Mock migrations + with patch("src.migration.MigrationManager") as mock_manager: + mock_manager_instance = Mock() + mock_manager.return_value = mock_manager_instance + with patch("time.sleep"): # Don't actually sleep in tests + db = Database() + # Verify pool was created + assert db.pool is not None + + +def test_get_latest_pois(database, mock_conn): + """Test fetching latest POI submissions.""" + mock_cursor = MagicMock() + mock_cursor.fetchall.return_value = [ + ("poi1", "indexer1"), + ("poi1", "indexer2"), + ("poi2", "indexer3"), + ] + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + result = database.get_latest_pois("deployment1", 1000) + + assert len(result) == 2 # Two unique POIs + assert result["poi1"] == {"indexer1", "indexer2"} + assert result["poi2"] == {"indexer3"} + + +def test_check_notification_sent(database, mock_conn): + """Test checking if notification was already sent.""" + mock_cursor = MagicMock() + mock_cursor.fetchone.return_value = (True,) + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + result = database.check_notification_sent("deployment1", 1000) + assert result is True + + +def test_record_notification(database, mock_conn): + """Test recording a notification.""" + mock_cursor = MagicMock() + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + database.record_notification("deployment1", 1000, "test message") + + # Verify the INSERT query was executed with correct parameters + mock_cursor.execute.assert_any_call( + """ + WITH current_pois AS ( + SELECT array_agg(DISTINCT p.poi ORDER BY p.poi) as poi_set + FROM pois p + JOIN blocks b ON b.id = p.block_id + JOIN sg_deployments d ON d.id = p.sg_deployment_id + WHERE d.ipfs_cid = %s AND b.number = %s + ) + INSERT INTO poi_notifications (deployment_id, block_number, message, sent_at, poi_set) + SELECT %s, %s, %s, NOW(), c.poi_set::bytea[] + FROM current_pois c + """, + ("deployment1", 1000, "deployment1", 1000, "test message"), + ) + mock_conn.commit.assert_called_once() + + +def test_cleanup_old_notifications(database, mock_conn): + """Test cleaning up old notifications.""" + mock_cursor = MagicMock() + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + database.cleanup_old_notifications(days=30) + + # Verify the DELETE query was executed with correct parameters + mock_cursor.execute.assert_any_call( + """ + DELETE FROM poi_notifications + WHERE sent_at < NOW() - INTERVAL '%s days' + """, + (30,), + ) + mock_conn.commit.assert_called_once() diff --git a/services/poi_monitor/tests/test_migration.py b/services/poi_monitor/tests/test_migration.py new file mode 100644 index 0000000..96c71fb --- /dev/null +++ b/services/poi_monitor/tests/test_migration.py @@ -0,0 +1,41 @@ +import pytest +from unittest.mock import MagicMock +from src.migration import MigrationManager + + +@pytest.fixture +def mock_conn(): + return MagicMock() + + +@pytest.fixture +def manager(mock_conn): + return MigrationManager(mock_conn) + + +def test_ensure_migration_table(manager, mock_conn): + """Test migration table creation.""" + mock_cursor = MagicMock() + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + # Reset the mock to clear any previous calls + mock_conn.commit.reset_mock() + + manager._ensure_migration_table() + + mock_cursor.execute.assert_called_once() + mock_conn.commit.assert_called_once() + + +def test_get_applied_migrations(manager, mock_conn): + """Test fetching applied migrations.""" + mock_cursor = MagicMock() + mock_cursor.fetchall.return_value = [ + ("001_create_notifications_table",), + ("002_another_migration",), + ] + mock_conn.cursor.return_value.__enter__.return_value = mock_cursor + + result = manager.get_applied_migrations() + assert len(result) == 2 + assert "001_create_notifications_table" in result diff --git a/services/poi_monitor/tests/test_monitor.py b/services/poi_monitor/tests/test_monitor.py new file mode 100644 index 0000000..3201f2d --- /dev/null +++ b/services/poi_monitor/tests/test_monitor.py @@ -0,0 +1,78 @@ +import pytest +from unittest.mock import Mock, patch +from src.monitor import main +from src.database import Database +from src.notification import SlackNotifier +from src.analyzer import PoiAnalyzer + + +def test_main_initializes_components(): + """Test that main properly initializes all components.""" + with patch("src.monitor.Database") as mock_db_class, patch( + "src.monitor.SlackNotifier" + ) as mock_notifier_class, patch( + "src.monitor.PoiAnalyzer" + ) as mock_analyzer_class, patch( + "src.monitor.time.sleep", side_effect=KeyboardInterrupt + ): # Break the loop + # Setup mocks + mock_db = Mock(spec=Database) + mock_notifier = Mock(spec=SlackNotifier) + mock_analyzer = Mock(spec=PoiAnalyzer) + + mock_db_class.return_value = mock_db + mock_notifier_class.return_value = mock_notifier + mock_analyzer_class.return_value = mock_analyzer + + # Run main (will be interrupted by KeyboardInterrupt) + main() + + # Verify components were initialized + mock_db_class.assert_called_once() + mock_notifier_class.assert_called_once() + mock_analyzer_class.assert_called_once_with(mock_db, mock_notifier) + + +def test_main_handles_initialization_error(): + """Test that main properly handles initialization errors.""" + with patch("src.monitor.Database", side_effect=Exception("Test error")), patch( + "src.monitor.logger" + ) as mock_logger: + with pytest.raises(Exception): + main() + + mock_logger.error.assert_called_once() + + +def test_main_processes_submissions(): + """Test that main calls process_new_submissions.""" + with patch("src.monitor.Database"), patch( + "src.monitor.SlackNotifier" + ), patch( + "src.monitor.PoiAnalyzer" + ) as mock_analyzer_class, patch( + "src.monitor.time.sleep", side_effect=KeyboardInterrupt + ): # Stop after first run + # Setup mocks + mock_analyzer = Mock(spec=PoiAnalyzer) + mock_analyzer_class.return_value = mock_analyzer + + # Run main + main() + + # Verify process_new_submissions was called + assert mock_analyzer.process_new_submissions.call_count == 1 + + +def test_main_handles_keyboard_interrupt(): + """Test that main handles keyboard interrupt gracefully.""" + with patch("src.monitor.Database"), patch( + "src.monitor.SlackNotifier" + ) as _, patch( + "src.monitor.PoiAnalyzer" + ) as _, patch( + "src.monitor.time.sleep", side_effect=KeyboardInterrupt + ), patch("src.monitor.logger") as mock_logger: + main() + + mock_logger.info.assert_any_call("Shutting down POI Monitor service...") diff --git a/services/poi_monitor/tests/test_notification.py b/services/poi_monitor/tests/test_notification.py new file mode 100644 index 0000000..470f596 --- /dev/null +++ b/services/poi_monitor/tests/test_notification.py @@ -0,0 +1,48 @@ +import pytest +from unittest.mock import patch +from src.notification import SlackNotifier +import requests + + +@pytest.fixture +def notifier(): + with patch.dict("os.environ", {"SLACK_WEBHOOK_URL": "http://test.url"}): + return SlackNotifier() + + +def test_send_notification_success(notifier): + """Test successful notification sending.""" + with patch("requests.post") as mock_post: + mock_post.return_value.raise_for_status.return_value = None + + result = notifier.send_notification("test message") + + assert result is True + mock_post.assert_called_once() + + +def test_send_notification_failure(notifier): + """Test handling of notification failure.""" + with patch("requests.post") as mock_post: + mock_post.side_effect = requests.exceptions.RequestException("Test error") + + result = notifier.send_notification("test message") + + assert result is False + + +def test_format_poi_discrepancy_message(notifier): + """Test message formatting.""" + data = { + "deployment_cid": "Qm123", + "block_number": 1000, + "submissions": {"poi1": {"indexer1", "indexer2"}, "poi2": {"indexer3"}}, + "reuse_info": {"poi2": ["Previously used in deployment X"]}, + } + + message = notifier.format_poi_discrepancy_message(data) + + assert "🚨" in message + assert "Qm123" in message + assert "indexer1" in message + assert "Previously used in deployment X" in message