diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 604fd53c..798daad8 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,14 +3,14 @@ exclude: "user_agents|website/web/sri.txt" repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.1.0 + rev: v4.5.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer - id: check-yaml - id: check-added-large-files - repo: https://github.com/asottile/pyupgrade - rev: v2.31.1 + rev: v3.15.0 hooks: - id: pyupgrade args: [--py38-plus] diff --git a/bin/archiver.py b/bin/archiver.py index bab732a9..1d5f1c63 100755 --- a/bin/archiver.py +++ b/bin/archiver.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import csv import gzip import logging @@ -23,7 +25,7 @@ class Archiver(AbstractManager): - def __init__(self, loglevel: Optional[int]=None): + def __init__(self, loglevel: int | None=None) -> None: super().__init__(loglevel) self.script_name = 'archiver' self.redis = Redis(unix_socket_path=get_socket_path('cache')) @@ -54,7 +56,7 @@ def __init__(self, loglevel: Optional[int]=None): self.s3fs_bucket = s3fs_config['config']['bucket_name'] self.s3fs_client.clear_multipart_uploads(self.s3fs_bucket) - def _to_run_forever(self): + def _to_run_forever(self) -> None: archiving_done = False # NOTE: When we archive a big directory, moving *a lot* of files, expecially to MinIO # can take a very long time. In order to avoid being stuck on the archiving, we break that in chunks @@ -71,14 +73,14 @@ def _to_run_forever(self): # This call takes a very long time on MinIO self._update_all_capture_indexes() - def _update_index(self, root_dir: Path, *, s3fs_parent_dir: Optional[str]=None) -> Optional[Path]: + def _update_index(self, root_dir: Path, *, s3fs_parent_dir: str | None=None) -> Path | None: # returns a path to the index for the given directory logmsg = f'Updating index for {root_dir}' if s3fs_parent_dir: logmsg = f'{logmsg} (s3fs)' self.logger.info(logmsg) - current_index: Dict[str, str] = {} + current_index: dict[str, str] = {} index_file = root_dir / 'index' if index_file.exists(): try: @@ -91,11 +93,11 @@ def _update_index(self, root_dir: Path, *, s3fs_parent_dir: Optional[str]=None) # NOTE: should we remove if it has subs? index_file.unlink() - sub_indexes: List[Path] = [] - current_index_dirs: Set[str] = set(current_index.values()) - new_captures: Set[Path] = set() + sub_indexes: list[Path] = [] + current_index_dirs: set[str] = set(current_index.values()) + new_captures: set[Path] = set() # Directories that are actually in the listing. - current_dirs: Set[str] = set() + current_dirs: set[str] = set() if s3fs_parent_dir: s3fs_dir = '/'.join([s3fs_parent_dir, root_dir.name]) @@ -212,7 +214,7 @@ def _update_index(self, root_dir: Path, *, s3fs_parent_dir: Optional[str]=None) return index_file - def _update_all_capture_indexes(self, *, recent_only: bool=False): + def _update_all_capture_indexes(self, *, recent_only: bool=False) -> None: '''Run that after the captures are in the proper directories''' # Recent captures self.logger.info('Update recent indexes') @@ -278,7 +280,7 @@ def __archive_single_capture(self, capture_path: Path) -> Path: return dest_dir / capture_path.name - def _archive(self): + def _archive(self) -> bool: archive_interval = timedelta(days=get_config('generic', 'archive')) cut_time = (datetime.now() - archive_interval) self.logger.info(f'Archiving all captures older than {cut_time.isoformat()}.') @@ -340,7 +342,7 @@ def _archive(self): self.logger.info('Archiving done.') return archiving_done - def __load_index(self, index_path: Path, ignore_sub: bool=False) -> Dict[str, str]: + def __load_index(self, index_path: Path, ignore_sub: bool=False) -> dict[str, str]: '''Loads the given index file and all the subsequent ones if they exist''' # NOTE: this method is used on recent and archived captures, it must never trigger a dir listing indexed_captures = {} @@ -359,7 +361,7 @@ def __load_index(self, index_path: Path, ignore_sub: bool=False) -> Dict[str, st indexed_captures[key] = str(index_path.parent / path_name) return indexed_captures - def _load_indexes(self): + def _load_indexes(self) -> None: # capture_dir / Year / Month / index <- should always exists. If not, created by _update_index # Initialize recent index for index in sorted(get_captures_dir().glob('*/*/index'), reverse=True): @@ -391,7 +393,7 @@ def _load_indexes(self): self.logger.info(f'Archived indexes loaded: {total_archived_captures} entries.') -def main(): +def main() -> None: a = Archiver() a.run(sleep_in_sec=3600) diff --git a/bin/async_capture.py b/bin/async_capture.py index dd276c68..e9d5a09d 100755 --- a/bin/async_capture.py +++ b/bin/async_capture.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import asyncio import json import logging @@ -10,7 +12,7 @@ from typing import Optional, Set, Union from lacuscore import LacusCore, CaptureStatus as CaptureStatusCore, CaptureResponse as CaptureResponseCore -from pylacus import PyLacus, CaptureStatus as CaptureStatusPy, CaptureResponse as CaptureResponsePy +from pylacus import PyLacus, CaptureStatus as CaptureStatusPy, CaptureResponse as CaptureResponsePy # type: ignore[attr-defined] from lookyloo.lookyloo import Lookyloo, CaptureSettings from lookyloo.default import AbstractManager, get_config @@ -23,7 +25,7 @@ class AsyncCapture(AbstractManager): - def __init__(self, loglevel: Optional[int]=None): + def __init__(self, loglevel: int | None=None) -> None: super().__init__(loglevel) self.script_name = 'async_capture' self.only_global_lookups: bool = get_config('generic', 'only_global_lookups') @@ -31,7 +33,7 @@ def __init__(self, loglevel: Optional[int]=None): self.lookyloo = Lookyloo() if isinstance(self.lookyloo.lacus, LacusCore): - self.captures: Set[asyncio.Task] = set() + self.captures: set[asyncio.Task] = set() # type: ignore[type-arg] self.fox = FOX(config_name='FOX') if not self.fox.available: @@ -41,23 +43,24 @@ def thirdparty_submit(self, url: str) -> None: if self.fox.available: self.fox.capture_default_trigger(url, auto_trigger=True) - async def _trigger_captures(self): + async def _trigger_captures(self) -> None: + # Only called if LacusCore is used max_new_captures = get_config('generic', 'async_capture_processes') - len(self.captures) self.logger.debug(f'{len(self.captures)} ongoing captures.') if max_new_captures <= 0: self.logger.info(f'Max amount of captures in parallel reached ({len(self.captures)})') - return - for capture_task in self.lookyloo.lacus.consume_queue(max_new_captures): + return None + for capture_task in self.lookyloo.lacus.consume_queue(max_new_captures): # type: ignore[union-attr] self.captures.add(capture_task) capture_task.add_done_callback(self.captures.discard) - def uuids_ready(self): + def uuids_ready(self) -> list[str]: return [uuid for uuid in self.lookyloo.redis.zrevrangebyscore('to_capture', 'Inf', '-Inf') if uuid and self.lookyloo.lacus.get_capture_status(uuid) in [CaptureStatusPy.DONE, CaptureStatusCore]] def process_capture_queue(self) -> None: '''Process a query from the capture queue''' - entries: Union[CaptureResponseCore, CaptureResponsePy] + entries: CaptureResponseCore | CaptureResponsePy for uuid in self.uuids_ready(): if isinstance(self.lookyloo.lacus, LacusCore): entries = self.lookyloo.lacus.get_capture(uuid, decode=True) @@ -71,9 +74,9 @@ def process_capture_queue(self) -> None: self.logger.info(log) self.lookyloo.redis.sadd('ongoing', uuid) - queue: Optional[str] = self.lookyloo.redis.getdel(f'{uuid}_mgmt') + queue: str | None = self.lookyloo.redis.getdel(f'{uuid}_mgmt') - to_capture: CaptureSettings = self.lookyloo.redis.hgetall(uuid) + to_capture: CaptureSettings = self.lookyloo.redis.hgetall(uuid) # type: ignore[assignment] if get_config('generic', 'default_public'): # By default, the captures are on the index, unless the user mark them as un-listed @@ -123,9 +126,9 @@ def process_capture_queue(self) -> None: self.unset_running() self.logger.info(f'Done with {uuid}') - async def _to_run_forever_async(self): + async def _to_run_forever_async(self) -> None: if self.force_stop: - return + return None if isinstance(self.lookyloo.lacus, LacusCore): await self._trigger_captures() @@ -135,7 +138,7 @@ async def _to_run_forever_async(self): self.process_capture_queue() - async def _wait_to_finish_async(self): + async def _wait_to_finish_async(self) -> None: if isinstance(self.lookyloo.lacus, LacusCore): while self.captures: self.logger.info(f'Waiting for {len(self.captures)} capture(s) to finish...') @@ -147,7 +150,7 @@ async def _wait_to_finish_async(self): self.logger.info('No more captures') -def main(): +def main() -> None: m = AsyncCapture() loop = asyncio.new_event_loop() diff --git a/bin/background_indexer.py b/bin/background_indexer.py index 44df55d2..1ab3ec9b 100755 --- a/bin/background_indexer.py +++ b/bin/background_indexer.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import logging import logging.config import os @@ -20,7 +22,7 @@ class BackgroundIndexer(AbstractManager): - def __init__(self, loglevel: Optional[int]=None): + def __init__(self, loglevel: int | None=None): super().__init__(loglevel) self.lookyloo = Lookyloo() self.script_name = 'background_indexer' @@ -28,7 +30,7 @@ def __init__(self, loglevel: Optional[int]=None): self.discarded_captures_dir = self.lookyloo.capture_dir.parent / 'discarded_captures' self.discarded_captures_dir.mkdir(parents=True, exist_ok=True) - def _to_run_forever(self): + def _to_run_forever(self) -> None: all_done = self._build_missing_pickles() if all_done: self._check_indexes() @@ -72,7 +74,7 @@ def _build_missing_pickles(self) -> bool: # The capture with this UUID exists, but it is for some reason missing in lookup_dirs self.lookyloo.redis.hset('lookup_dirs', uuid, str(path)) else: - cached_path = Path(self.lookyloo.redis.hget('lookup_dirs', uuid)) + cached_path = Path(self.lookyloo.redis.hget('lookup_dirs', uuid)) # type: ignore[arg-type] if cached_path != path: # we have a duplicate UUID, it is proably related to some bad copy/paste if cached_path.exists(): @@ -118,13 +120,13 @@ def _build_missing_pickles(self) -> bool: return True return False - def _check_indexes(self): + def _check_indexes(self) -> None: index_redis = self.lookyloo.indexing.redis can_index = index_redis.set('ongoing_indexing', 1, ex=3600, nx=True) if not can_index: # There is no reason to run this method in multiple scripts. self.logger.info('Indexing already ongoing in another process.') - return + return None self.logger.info('Check indexes...') for cache in self.lookyloo.sorted_capture_cache(cached_captures_only=False): if self.lookyloo.is_public_instance and cache.no_index: @@ -163,7 +165,7 @@ def _check_indexes(self): self.logger.info('... done.') -def main(): +def main() -> None: i = BackgroundIndexer() i.run(sleep_in_sec=60) diff --git a/bin/background_processing.py b/bin/background_processing.py index cdc23582..21515d65 100755 --- a/bin/background_processing.py +++ b/bin/background_processing.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import json import time import logging @@ -8,7 +10,7 @@ from datetime import date, timedelta from typing import Any, Dict, Optional -from lookyloo.lookyloo import Lookyloo, CaptureStatusCore, CaptureStatusPy +from lookyloo.lookyloo import Lookyloo, CaptureStatusCore, CaptureStatusPy # type: ignore[attr-defined] from lookyloo.default import AbstractManager, get_config, get_homedir, safe_create_dir from lookyloo.helpers import ParsedUserAgent, serialize_to_json @@ -17,19 +19,19 @@ class Processing(AbstractManager): - def __init__(self, loglevel: Optional[int]=None): + def __init__(self, loglevel: int | None=None): super().__init__(loglevel) self.script_name = 'processing' self.lookyloo = Lookyloo() self.use_own_ua = get_config('generic', 'use_user_agents_users') - def _to_run_forever(self): + def _to_run_forever(self) -> None: if self.use_own_ua: self._build_ua_file() self._retry_failed_enqueue() - def _build_ua_file(self): + def _build_ua_file(self) -> None: '''Build a file in a format compatible with the capture page''' yesterday = (date.today() - timedelta(days=1)) self_generated_ua_file_path = get_homedir() / 'own_user_agents' / str(yesterday.year) / f'{yesterday.month:02}' @@ -44,7 +46,7 @@ def _build_ua_file(self): self.logger.info(f'No User-agent file for {yesterday} to generate.') return - to_store: Dict[str, Any] = {'by_frequency': []} + to_store: dict[str, Any] = {'by_frequency': []} uas = Counter([entry.split('|', 1)[1] for entry in entries]) for ua, _ in uas.most_common(): parsed_ua = ParsedUserAgent(ua) @@ -71,7 +73,7 @@ def _build_ua_file(self): self.lookyloo.redis.delete(f'user_agents|{yesterday.isoformat()}') self.logger.info(f'User-agent file for {yesterday} generated.') - def _retry_failed_enqueue(self): + def _retry_failed_enqueue(self) -> None: '''If enqueuing failed, the settings are added, with a UUID in the 'to_capture key', and they have a UUID''' for uuid in self.lookyloo.redis.zrevrangebyscore('to_capture', 'Inf', '-Inf'): try_reenqueue = False @@ -131,7 +133,7 @@ def _retry_failed_enqueue(self): self.logger.info(f'{uuid} enqueued.') -def main(): +def main() -> None: p = Processing() p.run(sleep_in_sec=30) diff --git a/bin/run_backend.py b/bin/run_backend.py index 551ed717..200e6ba3 100755 --- a/bin/run_backend.py +++ b/bin/run_backend.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import argparse import os import time @@ -24,14 +26,14 @@ def check_running(name: str) -> bool: return False -def launch_cache(storage_directory: Optional[Path]=None): +def launch_cache(storage_directory: Path | None=None) -> None: if not storage_directory: storage_directory = get_homedir() if not check_running('cache'): Popen(["./run_redis.sh"], cwd=(storage_directory / 'cache')) -def shutdown_cache(storage_directory: Optional[Path]=None): +def shutdown_cache(storage_directory: Path | None=None) -> None: if not storage_directory: storage_directory = get_homedir() r = Redis(unix_socket_path=get_socket_path('cache')) @@ -39,14 +41,14 @@ def shutdown_cache(storage_directory: Optional[Path]=None): print('Redis cache database shutdown.') -def launch_indexing(storage_directory: Optional[Path]=None): +def launch_indexing(storage_directory: Path | None=None) -> None: if not storage_directory: storage_directory = get_homedir() if not check_running('indexing'): Popen(["./run_redis.sh"], cwd=(storage_directory / 'indexing')) -def shutdown_indexing(storage_directory: Optional[Path]=None): +def shutdown_indexing(storage_directory: Path | None=None) -> None: if not storage_directory: storage_directory = get_homedir() r = Redis(unix_socket_path=get_socket_path('indexing')) @@ -54,13 +56,13 @@ def shutdown_indexing(storage_directory: Optional[Path]=None): print('Redis indexing database shutdown.') -def launch_all(): +def launch_all() -> None: launch_cache() launch_indexing() -def check_all(stop: bool=False): - backends: Dict[str, bool] = {'cache': False, 'indexing': False} +def check_all(stop: bool=False) -> None: + backends: dict[str, bool] = {'cache': False, 'indexing': False} while True: for db_name in backends.keys(): try: @@ -81,12 +83,12 @@ def check_all(stop: bool=False): time.sleep(1) -def stop_all(): +def stop_all() -> None: shutdown_cache() shutdown_indexing() -def main(): +def main() -> None: parser = argparse.ArgumentParser(description='Manage backend DBs.') parser.add_argument("--start", action='store_true', default=False, help="Start all") parser.add_argument("--stop", action='store_true', default=False, help="Stop all") diff --git a/bin/shutdown.py b/bin/shutdown.py index 047468a4..c1b9fea9 100755 --- a/bin/shutdown.py +++ b/bin/shutdown.py @@ -5,7 +5,7 @@ from lookyloo.default import AbstractManager -def main(): +def main() -> None: AbstractManager.force_shutdown() time.sleep(5) while True: diff --git a/bin/start.py b/bin/start.py index df48ac1a..30fadd1e 100755 --- a/bin/start.py +++ b/bin/start.py @@ -5,7 +5,7 @@ from lookyloo.default import get_homedir -def main(): +def main() -> None: # Just fail if the env isn't set. get_homedir() print('Start backend (redis)...') diff --git a/bin/start_website.py b/bin/start_website.py index b8d2a6ae..83b20521 100755 --- a/bin/start_website.py +++ b/bin/start_website.py @@ -13,13 +13,13 @@ class Website(AbstractManager): - def __init__(self, loglevel: Optional[int]=None): + def __init__(self, loglevel: Optional[int]=None) -> None: super().__init__(loglevel) self.script_name = 'website' - self.process = self._launch_website() + self.process: Popen = self._launch_website() # type: ignore[type-arg] self.set_running() - def _launch_website(self): + def _launch_website(self) -> Popen: # type: ignore[type-arg] website_dir = get_homedir() / 'website' ip = get_config('generic', 'website_listen_ip') port = get_config('generic', 'website_listen_port') @@ -32,7 +32,7 @@ def _launch_website(self): cwd=website_dir) -def main(): +def main() -> None: w = Website() w.run(sleep_in_sec=10) diff --git a/bin/stop.py b/bin/stop.py index 68b8d121..a9126448 100755 --- a/bin/stop.py +++ b/bin/stop.py @@ -8,7 +8,7 @@ from lookyloo.default import get_homedir, get_socket_path -def main(): +def main() -> None: get_homedir() p = Popen(['shutdown']) p.wait() diff --git a/bin/update.py b/bin/update.py index 6d18dcfc..ae6cde8f 100755 --- a/bin/update.py +++ b/bin/update.py @@ -15,14 +15,14 @@ logging.config.dictConfig(get_config('logging')) -def compute_hash_self(): +def compute_hash_self() -> bytes: m = hashlib.sha256() with (get_homedir() / 'bin' / 'update.py').open('rb') as f: m.update(f.read()) return m.digest() -def keep_going(ignore=False): +def keep_going(ignore: bool=False) -> None: if ignore: return keep_going = input('Continue? (y/N) ') @@ -31,7 +31,7 @@ def keep_going(ignore=False): sys.exit() -def run_command(command, expect_fail: bool=False, capture_output: bool=True): +def run_command(command: str, expect_fail: bool=False, capture_output: bool=True) -> None: args = shlex.split(command) homedir = get_homedir() process = subprocess.run(args, cwd=homedir, capture_output=capture_output) @@ -42,7 +42,7 @@ def run_command(command, expect_fail: bool=False, capture_output: bool=True): sys.exit() -def check_poetry_version(): +def check_poetry_version() -> None: args = shlex.split("poetry self -V") homedir = get_homedir() process = subprocess.run(args, cwd=homedir, capture_output=True) @@ -58,7 +58,7 @@ def check_poetry_version(): sys.exit() -def main(): +def main() -> None: parser = argparse.ArgumentParser(description='Pull latest release, update dependencies, update and validate the config files, update 3rd deps for the website.') parser.add_argument('--yes', default=False, action='store_true', help='Run all commands without asking.') args = parser.parse_args() diff --git a/lookyloo/__init__.py b/lookyloo/__init__.py index 967b94b7..376f4d41 100644 --- a/lookyloo/__init__.py +++ b/lookyloo/__init__.py @@ -1,3 +1,8 @@ import logging +from .lookyloo import Lookyloo # noqa +from .indexing import Indexing # noqa + logging.getLogger(__name__).addHandler(logging.NullHandler()) + +__all__ = ['Lookyloo', 'Indexing'] diff --git a/lookyloo/capturecache.py b/lookyloo/capturecache.py index 1b9a5abc..5cb18101 100644 --- a/lookyloo/capturecache.py +++ b/lookyloo/capturecache.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import contextlib import gzip import json @@ -13,15 +15,15 @@ from collections.abc import Mapping from datetime import datetime -from functools import lru_cache +from functools import lru_cache, _CacheInfo as CacheInfo from logging import Logger, LoggerAdapter from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, Union, Set, MutableMapping +from typing import Any, Dict, List, Optional, Tuple, Union, Set, MutableMapping, Iterator import dns.rdatatype import dns.resolver -from har2tree import CrawledTree, Har2TreeError, HarFile -from pyipasnhistory import IPASNHistory +from har2tree import CrawledTree, Har2TreeError, HarFile # type: ignore[attr-defined] +from pyipasnhistory import IPASNHistory # type: ignore[attr-defined] from redis import Redis from .context import Context @@ -32,11 +34,11 @@ from .modules import Cloudflare -class LookylooCacheLogAdapter(LoggerAdapter): +class LookylooCacheLogAdapter(LoggerAdapter): # type: ignore[type-arg] """ Prepend log entry with the UUID of the capture """ - def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> Tuple[str, MutableMapping[str, Any]]: + def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> tuple[str, MutableMapping[str, Any]]: if self.extra: return '[{}] {}'.format(self.extra['uuid'], msg), kwargs return msg, kwargs @@ -47,10 +49,10 @@ class CaptureCache(): 'error', 'no_index', 'categories', 'parent', 'user_agent', 'referer', 'logger') - def __init__(self, cache_entry: Dict[str, Any]): + def __init__(self, cache_entry: dict[str, Any]): logger = logging.getLogger(f'{self.__class__.__name__}') logger.setLevel(get_config('generic', 'loglevel')) - __default_cache_keys: Tuple[str, str, str, str, str, str] = ('uuid', 'title', 'timestamp', + __default_cache_keys: tuple[str, str, str, str, str, str] = ('uuid', 'title', 'timestamp', 'url', 'redirects', 'capture_dir') if 'uuid' not in cache_entry or 'capture_dir' not in cache_entry: raise LookylooException(f'The capture is deeply broken: {cache_entry}') @@ -80,16 +82,16 @@ def __init__(self, cache_entry: Dict[str, Any]): # If the microsecond is missing (0), it fails self.timestamp = datetime.strptime(cache_entry['timestamp'], '%Y-%m-%dT%H:%M:%S%z') - self.redirects: List[str] = json.loads(cache_entry['redirects']) if cache_entry.get('redirects') else [] + self.redirects: list[str] = json.loads(cache_entry['redirects']) if cache_entry.get('redirects') else [] # Error without all the keys in __default_cache_keys was fatal. # if the keys in __default_cache_keys are present, it was an HTTP error and we still need to pass the error along - self.error: Optional[str] = cache_entry.get('error') + self.error: str | None = cache_entry.get('error') self.no_index: bool = True if cache_entry.get('no_index') in [1, '1'] else False - self.categories: List[str] = json.loads(cache_entry['categories']) if cache_entry.get('categories') else [] - self.parent: Optional[str] = cache_entry.get('parent') - self.user_agent: Optional[str] = cache_entry.get('user_agent') - self.referer: Optional[str] = cache_entry.get('referer') + self.categories: list[str] = json.loads(cache_entry['categories']) if cache_entry.get('categories') else [] + self.parent: str | None = cache_entry.get('parent') + self.user_agent: str | None = cache_entry.get('user_agent') + self.referer: str | None = cache_entry.get('referer') @property def tree(self) -> CrawledTree: @@ -142,26 +144,26 @@ def load_pickle_tree(capture_dir: Path, last_mod_time: int, logger: Logger) -> C raise NoValidHarFile("Couldn't find HAR files") -def serialize_sets(obj): +def serialize_sets(obj: Any) -> Any: if isinstance(obj, set): return list(obj) return obj -class CapturesIndex(Mapping): +class CapturesIndex(Mapping): # type: ignore[type-arg] - def __init__(self, redis: Redis, contextualizer: Optional[Context]=None): + def __init__(self, redis: Redis, contextualizer: Context | None=None) -> None: # type: ignore[type-arg] self.logger = logging.getLogger(f'{self.__class__.__name__}') self.logger.setLevel(get_config('generic', 'loglevel')) self.redis = redis self.indexing = Indexing() self.contextualizer = contextualizer - self.__cache: Dict[str, CaptureCache] = {} + self.__cache: dict[str, CaptureCache] = {} self._quick_init() self.timeout = get_config('generic', 'max_tree_create_time') try: - self.ipasnhistory: Optional[IPASNHistory] = IPASNHistory() + self.ipasnhistory: IPASNHistory | None = IPASNHistory() if not self.ipasnhistory.is_up: self.ipasnhistory = None except Exception as e: @@ -169,7 +171,7 @@ def __init__(self, redis: Redis, contextualizer: Optional[Context]=None): self.logger.warning(f'Unable to setup IPASN History: {e}') self.ipasnhistory = None try: - self.cloudflare: Optional[Cloudflare] = Cloudflare() + self.cloudflare: Cloudflare | None = Cloudflare() if not self.cloudflare.available: self.cloudflare = None except Exception as e: @@ -177,7 +179,7 @@ def __init__(self, redis: Redis, contextualizer: Optional[Context]=None): self.cloudflare = None @property - def cached_captures(self) -> Set[str]: + def cached_captures(self) -> set[str]: self._quick_init() return set(self.__cache.keys()) @@ -199,10 +201,10 @@ def __getitem__(self, uuid: str) -> CaptureCache: self.__cache[uuid] = self._set_capture_cache(capture_dir) return self.__cache[uuid] - def __iter__(self): - return iter(self.__cache) + def __iter__(self) -> Iterator[dict[str, CaptureCache]]: + return iter(self.__cache) # type: ignore[arg-type] - def __len__(self): + def __len__(self) -> int: return len(self.__cache) def reload_cache(self, uuid: str) -> None: @@ -221,7 +223,7 @@ def rebuild_all(self) -> None: self.redis.flushdb() self.__cache = {} - def lru_cache_status(self): + def lru_cache_status(self) -> CacheInfo: return load_pickle_tree.cache_info() def _quick_init(self) -> None: @@ -332,11 +334,11 @@ def _create_pickle(self, capture_dir: Path, logger: LookylooCacheLogAdapter) -> return tree @staticmethod - def _raise_timeout(_, __): + def _raise_timeout(_, __) -> None: # type: ignore[no-untyped-def] raise TimeoutError @contextlib.contextmanager - def _timeout_context(self): + def _timeout_context(self) -> Iterator[None]: if self.timeout != 0: # Register a function to raise a TimeoutError on the signal. signal.signal(signal.SIGALRM, self._raise_timeout) @@ -378,7 +380,7 @@ def _set_capture_cache(self, capture_dir_str: str) -> CaptureCache: logger.warning(f'Unable to rebuild the tree for {capture_dir}, the HAR files are broken.') tree = None - cache: Dict[str, Union[str, int]] = {'uuid': uuid, 'capture_dir': capture_dir_str} + cache: dict[str, str | int] = {'uuid': uuid, 'capture_dir': capture_dir_str} if capture_settings.get('url'): cache['url'] = capture_settings['url'] @@ -450,18 +452,18 @@ def _set_capture_cache(self, capture_dir_str: str) -> CaptureCache: p.execute() return CaptureCache(cache) - def __resolve_dns(self, ct: CrawledTree, logger: LookylooCacheLogAdapter): + def __resolve_dns(self, ct: CrawledTree, logger: LookylooCacheLogAdapter) -> CrawledTree: '''Resolves all domains of the tree, keeps A (IPv4), AAAA (IPv6), and CNAME entries and store them in ips.json and cnames.json, in the capture directory. Updates the nodes of the tree accordingly so the information is available. ''' - def _build_cname_chain(known_cnames: Dict[str, str], hostname) -> List[str]: + def _build_cname_chain(known_cnames: dict[str, str], hostname: str) -> list[str]: '''Returns a list of CNAMEs starting from one hostname. The CNAMEs resolutions are made in `_resolve_dns`. A hostname can have a CNAME entry and the CNAME entry can have an other CNAME entry, and so on multiple times. This method loops over the hostnames until there are no CNAMES.''' - cnames: List[str] = [] + cnames: list[str] = [] to_search = hostname while True: if not known_cnames.get(to_search): @@ -474,7 +476,7 @@ def _build_cname_chain(known_cnames: Dict[str, str], hostname) -> List[str]: ips_path = ct.root_hartree.har.path.parent / 'ips.json' ipasn_path = ct.root_hartree.har.path.parent / 'ipasn.json' - host_cnames: Dict[str, str] = {} + host_cnames: dict[str, str] = {} if cnames_path.exists(): try: with cnames_path.open() as f: @@ -483,7 +485,7 @@ def _build_cname_chain(known_cnames: Dict[str, str], hostname) -> List[str]: # The json is broken, delete and re-trigger the requests host_cnames = {} - host_ips: Dict[str, Dict[str, Set[str]]] = {} + host_ips: dict[str, dict[str, set[str]]] = {} if ips_path.exists(): try: with ips_path.open() as f: @@ -492,7 +494,7 @@ def _build_cname_chain(known_cnames: Dict[str, str], hostname) -> List[str]: # The json is broken, delete and re-trigger the requests host_ips = {} - ipasn: Dict[str, Dict[str, str]] = {} + ipasn: dict[str, dict[str, str]] = {} if ipasn_path.exists(): try: with ipasn_path.open() as f: diff --git a/lookyloo/comparator.py b/lookyloo/comparator.py index 66071edc..33b16be7 100644 --- a/lookyloo/comparator.py +++ b/lookyloo/comparator.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 +from __future__ import annotations + import fnmatch import logging from typing import Dict, Any, Union, List, Optional, TypedDict, Tuple -from har2tree import URLNode +from har2tree import URLNode # type: ignore[attr-defined] from redis import ConnectionPool, Redis from redis.connection import UnixDomainSocketConnection @@ -19,8 +21,8 @@ class CompareSettings(TypedDict): '''The settings that can be passed to the compare method to filter out some differences''' - ressources_ignore_domains: Tuple[str, ...] - ressources_ignore_regexes: Tuple[str, ...] + ressources_ignore_domains: tuple[str, ...] + ressources_ignore_regexes: tuple[str, ...] ignore_ips: bool @@ -39,16 +41,16 @@ def __init__(self) -> None: self.public_domain = get_config('generic', 'public_domain') @property - def redis(self) -> Redis: + def redis(self) -> Redis: # type: ignore[type-arg] return Redis(connection_pool=self.redis_pool) - def get_comparables_node(self, node: URLNode) -> Dict[str, str]: + def get_comparables_node(self, node: URLNode) -> dict[str, str]: to_return = {'url': node.name, 'hostname': node.hostname} if hasattr(node, 'ip_address'): to_return['ip_address'] = str(node.ip_address) return to_return - def _compare_nodes(self, left: Dict[str, str], right: Dict[str, str], /, different: bool, ignore_ips: bool) -> Tuple[bool, Dict[str, Any]]: + def _compare_nodes(self, left: dict[str, str], right: dict[str, str], /, different: bool, ignore_ips: bool) -> tuple[bool, dict[str, Any]]: to_return = {} # URL if left['url'] != right['url']: @@ -78,12 +80,12 @@ def _compare_nodes(self, left: Dict[str, str], right: Dict[str, str], /, differe # IPs in hostnode + ASNs return different, to_return - def get_comparables_capture(self, capture_uuid: str) -> Dict[str, Any]: + def get_comparables_capture(self, capture_uuid: str) -> dict[str, Any]: if capture_uuid not in self._captures_index: raise MissingUUID(f'{capture_uuid} does not exists.') capture = self._captures_index[capture_uuid] - to_return: Dict[str, Any] + to_return: dict[str, Any] try: if capture.error: # The error on lookyloo is too verbose and contains the UUID of the capture, skip that. @@ -108,17 +110,17 @@ def get_comparables_capture(self, capture_uuid: str) -> Dict[str, Any]: to_return = {'error': str(e)} return to_return - def compare_captures(self, capture_left: str, capture_right: str, /, *, settings: Optional[CompareSettings]=None) -> Tuple[bool, Dict[str, Any]]: + def compare_captures(self, capture_left: str, capture_right: str, /, *, settings: CompareSettings | None=None) -> tuple[bool, dict[str, Any]]: if capture_left not in self._captures_index: raise MissingUUID(f'{capture_left} does not exists.') if capture_right not in self._captures_index: raise MissingUUID(f'{capture_right} does not exists.') different: bool = False - to_return: Dict[str, Dict[str, Union[str, - List[Union[str, Dict[str, Any]]], - Dict[str, Union[int, str, - List[Union[int, str, Dict[str, Any]]]]]]]] = {} + to_return: dict[str, dict[str, (str | + list[str | dict[str, Any]] | + dict[str, (int | str | + list[int | str | dict[str, Any]])])]] = {} to_return['lookyloo_urls'] = {'left': f'https://{self.public_domain}/tree/{capture_left}', 'right': f'https://{self.public_domain}/tree/{capture_right}'} left = self.get_comparables_capture(capture_left) @@ -192,7 +194,7 @@ def compare_captures(self, capture_left: str, capture_right: str, /, *, settings 'details': left['redirects']['length']} # Prepare settings - _settings: Optional[CompareSettings] + _settings: CompareSettings | None if settings: # cleanup the settings _ignore_domains = set(settings['ressources_ignore_domains'] if settings.get('ressources_ignore_domains') else []) diff --git a/lookyloo/context.py b/lookyloo/context.py index af8b4b3c..4a69f71d 100644 --- a/lookyloo/context.py +++ b/lookyloo/context.py @@ -1,12 +1,14 @@ #!/usr/bin/env python3 +from __future__ import annotations + import json import logging from pathlib import Path from typing import Any, Dict, List, Optional, Set, Union from urllib.parse import urlsplit -from har2tree import CrawledTree, HostNode, URLNode +from har2tree import CrawledTree, HostNode, URLNode # type: ignore[attr-defined] from redis import Redis from .default import get_config, get_homedir, get_socket_path @@ -16,14 +18,14 @@ class Context(): - def __init__(self): + def __init__(self) -> None: self.logger = logging.getLogger(f'{self.__class__.__name__}') self.logger.setLevel(get_config('generic', 'loglevel')) - self.redis: Redis = Redis(unix_socket_path=get_socket_path('indexing'), db=1, decode_responses=True) + self.redis: Redis = Redis(unix_socket_path=get_socket_path('indexing'), db=1, decode_responses=True) # type: ignore[type-arg] self._cache_known_content() self.sanejs = SaneJavaScript(config_name='SaneJS') - def clear_context(self): + def clear_context(self) -> None: self.redis.flushdb() def _cache_known_content(self) -> None: @@ -55,13 +57,13 @@ def _cache_known_content(self) -> None: p.sadd(f'bh|{h}|legitimate', *details['hostnames']) p.execute() - def find_known_content(self, har2tree_container: Union[CrawledTree, HostNode, URLNode, str]) -> Dict[str, Any]: + def find_known_content(self, har2tree_container: CrawledTree | HostNode | URLNode | str) -> dict[str, Any]: """Return a dictionary of content resources found in the local known_content database, or in SaneJS (if enabled)""" if isinstance(har2tree_container, str): - to_lookup: Set[str] = {har2tree_container, } + to_lookup: set[str] = {har2tree_container, } else: to_lookup = get_resources_hashes(har2tree_container) - known_content_table: Dict[str, Any] = {} + known_content_table: dict[str, Any] = {} if not to_lookup: return known_content_table # get generic known content @@ -113,7 +115,7 @@ def find_known_content(self, har2tree_container: Union[CrawledTree, HostNode, UR return known_content_table - def store_known_legitimate_tree(self, tree: CrawledTree): + def store_known_legitimate_tree(self, tree: CrawledTree) -> None: known_content = self.find_known_content(tree) capture_file: Path = get_homedir() / 'known_content_user' / f'{urlsplit(tree.root_url).hostname}.json' if capture_file.exists(): @@ -156,7 +158,7 @@ def store_known_legitimate_tree(self, tree: CrawledTree): with open(capture_file, 'w') as f: json.dump(to_store, f, indent=2, default=serialize_to_json) - def mark_as_legitimate(self, tree: CrawledTree, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> None: + def mark_as_legitimate(self, tree: CrawledTree, hostnode_uuid: str | None=None, urlnode_uuid: str | None=None) -> None: if hostnode_uuid: urlnodes = tree.root_hartree.get_host_node_by_uuid(hostnode_uuid).urls elif urlnode_uuid: @@ -214,7 +216,7 @@ def contextualize_tree(self, tree: CrawledTree) -> CrawledTree: def legitimate_body(self, body_hash: str, legitimate_hostname: str) -> None: self.redis.sadd(f'bh|{body_hash}|legitimate', legitimate_hostname) - def store_known_malicious_ressource(self, ressource_hash: str, details: Dict[str, str]): + def store_known_malicious_ressource(self, ressource_hash: str, details: dict[str, str]) -> None: known_malicious_ressource_file = get_homedir() / 'known_content_user' / 'malicious.json' if known_malicious_ressource_file.exists(): with open(known_malicious_ressource_file) as f: @@ -236,7 +238,7 @@ def store_known_malicious_ressource(self, ressource_hash: str, details: Dict[str with open(known_malicious_ressource_file, 'w') as f: json.dump(to_store, f, indent=2, default=serialize_to_json) - def add_malicious(self, ressource_hash: str, details: Dict[str, str]): + def add_malicious(self, ressource_hash: str, details: dict[str, str]) -> None: self.store_known_malicious_ressource(ressource_hash, details) p = self.redis.pipeline() p.sadd('bh|malicious', ressource_hash) @@ -246,7 +248,7 @@ def add_malicious(self, ressource_hash: str, details: Dict[str, str]): p.sadd(f'{ressource_hash}|tag', details['type']) p.execute() - def store_known_legitimate_ressource(self, ressource_hash: str, details: Dict[str, str]): + def store_known_legitimate_ressource(self, ressource_hash: str, details: dict[str, str]) -> None: known_legitimate_ressource_file = get_homedir() / 'known_content_user' / 'legitimate.json' if known_legitimate_ressource_file.exists(): with open(known_legitimate_ressource_file) as f: @@ -267,7 +269,7 @@ def store_known_legitimate_ressource(self, ressource_hash: str, details: Dict[st with open(known_legitimate_ressource_file, 'w') as f: json.dump(to_store, f, indent=2, default=serialize_to_json) - def add_legitimate(self, ressource_hash: str, details: Dict[str, str]): + def add_legitimate(self, ressource_hash: str, details: dict[str, str]) -> None: self.store_known_legitimate_ressource(ressource_hash, details) if 'domain' in details: self.redis.sadd(f'bh|{ressource_hash}|legitimate', details['domain']) @@ -277,7 +279,7 @@ def add_legitimate(self, ressource_hash: str, details: Dict[str, str]): # Query DB - def is_legitimate(self, urlnode: URLNode, known_hashes: Dict[str, Any]) -> Optional[bool]: + def is_legitimate(self, urlnode: URLNode, known_hashes: dict[str, Any]) -> bool | None: """ If legitimate if generic, marked as legitimate or known on sanejs, loaded from the right domain 3 cases: @@ -285,7 +287,7 @@ def is_legitimate(self, urlnode: URLNode, known_hashes: Dict[str, Any]) -> Optio * False if *any* content is malicious * None in all other cases """ - status: List[Optional[bool]] = [] + status: list[bool | None] = [] for h in urlnode.resources_hashes: # Note: we can have multiple hashes on the same urlnode (see embedded resources). if h not in known_hashes: @@ -305,7 +307,7 @@ def is_legitimate(self, urlnode: URLNode, known_hashes: Dict[str, Any]) -> Optio return True # All the contents are known legitimate return None - def is_malicious(self, urlnode: URLNode, known_hashes: Dict[str, Any]) -> Optional[bool]: + def is_malicious(self, urlnode: URLNode, known_hashes: dict[str, Any]) -> bool | None: """3 cases: * True if *any* content is malicious * False if *all* the contents are known legitimate diff --git a/lookyloo/default/__init__.py b/lookyloo/default/__init__.py index 274658fd..a56c2828 100644 --- a/lookyloo/default/__init__.py +++ b/lookyloo/default/__init__.py @@ -16,3 +16,17 @@ from .helpers import get_homedir, load_configs, get_config, safe_create_dir, get_socket_path, try_make_file # noqa os.chdir(get_homedir()) + +__all__ = [ + 'LookylooException', + 'AbstractManager', + 'MissingEnv', + 'CreateDirectoryException', + 'ConfigError', + 'get_homedir', + 'load_configs', + 'get_config', + 'safe_create_dir', + 'get_socket_path', + 'try_make_file', +] diff --git a/lookyloo/default/abstractmanager.py b/lookyloo/default/abstractmanager.py index 1b96a5ca..70d85ad9 100644 --- a/lookyloo/default/abstractmanager.py +++ b/lookyloo/default/abstractmanager.py @@ -1,14 +1,16 @@ #!/usr/bin/env python3 +from __future__ import annotations + import asyncio import logging +import logging.config import os import signal import time from abc import ABC from datetime import datetime, timedelta from subprocess import Popen -from typing import List, Optional, Tuple from redis import Redis from redis.exceptions import ConnectionError as RedisConnectionError @@ -20,18 +22,18 @@ class AbstractManager(ABC): script_name: str - def __init__(self, loglevel: Optional[int]=None): + def __init__(self, loglevel: int | None=None): self.loglevel: int = loglevel if loglevel is not None else get_config('generic', 'loglevel') or logging.INFO self.logger = logging.getLogger(f'{self.__class__.__name__}') self.logger.setLevel(self.loglevel) self.logger.info(f'Initializing {self.__class__.__name__}') - self.process: Optional[Popen] = None + self.process: Popen | None = None # type: ignore[type-arg] self.__redis = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) self.force_stop = False @staticmethod - def is_running() -> List[Tuple[str, float]]: + def is_running() -> list[tuple[str, float]]: try: r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) for script_name, score in r.zrangebyscore('running', '-inf', '+inf', withscores=True): @@ -52,7 +54,7 @@ def is_running() -> List[Tuple[str, float]]: return [] @staticmethod - def clear_running(): + def clear_running() -> None: try: r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) r.delete('running') @@ -60,14 +62,14 @@ def clear_running(): print('Unable to connect to redis, the system is down.') @staticmethod - def force_shutdown(): + def force_shutdown() -> None: try: r = Redis(unix_socket_path=get_socket_path('cache'), db=1, decode_responses=True) r.set('shutdown', 1) except RedisConnectionError: print('Unable to connect to redis, the system is down.') - def set_running(self, number: Optional[int]=None) -> None: + def set_running(self, number: int | None=None) -> None: if number == 0: self.__redis.zrem('running', self.script_name) else: @@ -111,7 +113,7 @@ def shutdown_requested(self) -> bool: def _to_run_forever(self) -> None: raise NotImplementedError('This method must be implemented by the child') - def _kill_process(self): + def _kill_process(self) -> None: if self.process is None: return kill_order = [signal.SIGWINCH, signal.SIGTERM, signal.SIGINT, signal.SIGKILL] @@ -167,7 +169,7 @@ def run(self, sleep_in_sec: int) -> None: def _wait_to_finish(self) -> None: self.logger.info('Not implemented, nothing to wait for.') - async def stop(self): + async def stop(self) -> None: self.force_stop = True async def _to_run_forever_async(self) -> None: @@ -176,7 +178,7 @@ async def _to_run_forever_async(self) -> None: async def _wait_to_finish_async(self) -> None: self.logger.info('Not implemented, nothing to wait for.') - async def stop_async(self): + async def stop_async(self) -> None: """Method to pass the signal handler: loop.add_signal_handler(signal.SIGTERM, lambda: loop.create_task(p.stop())) """ diff --git a/lookyloo/default/helpers.py b/lookyloo/default/helpers.py index 25e35bb2..a1ad9c41 100644 --- a/lookyloo/default/helpers.py +++ b/lookyloo/default/helpers.py @@ -1,4 +1,7 @@ #!/usr/bin/env python3 + +from __future__ import annotations + import json import logging import os @@ -9,7 +12,7 @@ from . import env_global_name from .exceptions import ConfigError, CreateDirectoryException, MissingEnv -configs: Dict[str, Dict[str, Any]] = {} +configs: dict[str, dict[str, Any]] = {} logger = logging.getLogger('Helpers') @@ -34,7 +37,7 @@ def get_homedir() -> Path: @lru_cache(64) -def load_configs(path_to_config_files: Optional[Union[str, Path]]=None): +def load_configs(path_to_config_files: str | Path | None=None) -> None: global configs if configs: return @@ -57,7 +60,7 @@ def load_configs(path_to_config_files: Optional[Union[str, Path]]=None): @lru_cache(64) -def get_config(config_type: str, entry: Optional[str]=None, quiet: bool=False) -> Any: +def get_config(config_type: str, entry: str | None=None, quiet: bool=False) -> Any: """Get an entry from the given config_type file. Automatic fallback to the sample file""" global configs if not configs: @@ -97,7 +100,7 @@ def get_socket_path(name: str) -> str: return str(get_homedir() / mapping[name]) -def try_make_file(filename: Path): +def try_make_file(filename: Path) -> bool: try: filename.touch(exist_ok=False) return True diff --git a/lookyloo/helpers.py b/lookyloo/helpers.py index a0fb7e90..94257dfc 100644 --- a/lookyloo/helpers.py +++ b/lookyloo/helpers.py @@ -14,23 +14,22 @@ from urllib.parse import urlparse -from har2tree import CrawledTree, HostNode, URLNode +from har2tree import CrawledTree, HostNode, URLNode # type: ignore[attr-defined] from playwrightcapture import get_devices from publicsuffixlist import PublicSuffixList # type: ignore -from pytaxonomies import Taxonomies +from pytaxonomies import Taxonomies # type: ignore[attr-defined] from ua_parser import user_agent_parser # type: ignore from werkzeug.user_agent import UserAgent from werkzeug.utils import cached_property -from .default import get_homedir, safe_create_dir, get_config -from .exceptions import LookylooException +from .default import get_homedir, safe_create_dir, get_config, LookylooException logger = logging.getLogger('Lookyloo - Helpers') # This method is used in json.dump or json.dumps calls as the default parameter: # json.dumps(..., default=dump_to_json) -def serialize_to_json(obj: Union[Set]) -> Union[List]: +def serialize_to_json(obj: Union[Set[Any]]) -> Union[List[Any]]: if isinstance(obj, set): return sorted(obj) @@ -52,12 +51,12 @@ def get_resources_hashes(har2tree_container: Union[CrawledTree, HostNode, URLNod @lru_cache(64) -def get_taxonomies(): +def get_taxonomies() -> Taxonomies: return Taxonomies() @lru_cache(64) -def get_public_suffix_list(): +def get_public_suffix_list() -> PublicSuffixList: """Initialize Public Suffix List""" # TODO (?): fetch the list return PublicSuffixList() @@ -131,7 +130,7 @@ def get_sorted_captures_from_disk(captures_dir: Path, /, *, class UserAgents: - def __init__(self): + def __init__(self) -> None: if get_config('generic', 'use_user_agents_users'): self.path = get_homedir() / 'own_user_agents' else: @@ -145,14 +144,14 @@ def __init__(self): self.playwright_devices = get_devices() self._load_newest_ua_file(ua_files_path[0]) - def _load_newest_ua_file(self, path: Path): + def _load_newest_ua_file(self, path: Path) -> None: self.most_recent_ua_path = path with self.most_recent_ua_path.open() as f: self.most_recent_uas = json.load(f) self.by_freq = self.most_recent_uas.pop('by_frequency') self._load_playwright_devices() - def _load_playwright_devices(self): + def _load_playwright_devices(self) -> None: # Only get default and desktop for now. for device_name, details in self.playwright_devices['desktop']['default'].items(): parsed_ua = ParsedUserAgent(details['user_agent']) @@ -254,16 +253,17 @@ def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes, L return to_return -def uniq_domains(uniq_urls): +def uniq_domains(uniq_urls: List[str]) -> Set[str]: domains = set() for url in uniq_urls: splitted = urlparse(url) - domains.add(splitted.hostname) + if splitted.hostname: + domains.add(splitted.hostname) return domains @lru_cache(64) -def get_useragent_for_requests(): +def get_useragent_for_requests() -> str: return f'Lookyloo / {version("lookyloo")}' @@ -331,11 +331,11 @@ class ParsedUserAgent(UserAgent): # from https://python.tutorialink.com/how-do-i-get-the-user-agent-with-flask/ @cached_property - def _details(self): + def _details(self) -> Dict[str, Any]: return user_agent_parser.Parse(self.string) @property - def platform(self): + def platform(self) -> Optional[str]: # type: ignore[override] return self._details['os'].get('family') @property @@ -343,11 +343,11 @@ def platform_version(self) -> Optional[str]: return self._aggregate_version(self._details['os']) @property - def browser(self): + def browser(self) -> Optional[str]: # type: ignore[override] return self._details['user_agent'].get('family') @property - def version(self): + def version(self) -> Optional[str]: # type: ignore[override] return self._aggregate_version(self._details['user_agent']) def _aggregate_version(self, details: Dict[str, str]) -> Optional[str]: @@ -357,5 +357,5 @@ def _aggregate_version(self, details: Dict[str, str]) -> Optional[str]: if (part := details.get(key)) is not None ) - def __str__(self): + def __str__(self) -> str: return f'OS: {self.platform} - Browser: {self.browser} {self.version} - UA: {self.string}' diff --git a/lookyloo/indexing.py b/lookyloo/indexing.py index 2dba8d3e..93255f18 100644 --- a/lookyloo/indexing.py +++ b/lookyloo/indexing.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import hashlib import logging # import re @@ -7,7 +9,7 @@ from typing import Dict, Iterable, List, Optional, Set, Tuple from urllib.parse import urlsplit -from har2tree import CrawledTree +from har2tree import CrawledTree # type: ignore[attr-defined] from redis import ConnectionPool, Redis from redis.connection import UnixDomainSocketConnection @@ -23,11 +25,11 @@ def __init__(self) -> None: self.redis_pool: ConnectionPool = ConnectionPool(connection_class=UnixDomainSocketConnection, path=get_socket_path('indexing'), decode_responses=True) - def clear_indexes(self): + def clear_indexes(self) -> None: self.redis.flushdb() @property - def redis(self): + def redis(self) -> Redis: # type: ignore[type-arg] return Redis(connection_pool=self.redis_pool) def new_internal_uuids(self, crawled_tree: CrawledTree) -> None: @@ -45,25 +47,25 @@ def new_internal_uuids(self, crawled_tree: CrawledTree) -> None: # ###### Cookies ###### @property - def cookies_names(self) -> List[Tuple[str, float]]: + def cookies_names(self) -> list[tuple[str, float]]: return self.redis.zrevrange('cookies_names', 0, -1, withscores=True) def cookies_names_number_domains(self, cookie_name: str) -> int: return self.redis.zcard(f'cn|{cookie_name}') - def cookies_names_domains_values(self, cookie_name: str, domain: str) -> List[Tuple[str, float]]: + def cookies_names_domains_values(self, cookie_name: str, domain: str) -> list[tuple[str, float]]: return self.redis.zrevrange(f'cn|{cookie_name}|{domain}', 0, -1, withscores=True) - def get_cookie_domains(self, cookie_name: str) -> List[Tuple[str, float]]: + def get_cookie_domains(self, cookie_name: str) -> list[tuple[str, float]]: return self.redis.zrevrange(f'cn|{cookie_name}', 0, -1, withscores=True) - def get_cookies_names_captures(self, cookie_name: str) -> List[Tuple[str, str]]: + def get_cookies_names_captures(self, cookie_name: str) -> list[tuple[str, str]]: return [uuids.split('|') for uuids in self.redis.smembers(f'cn|{cookie_name}|captures')] def _reindex_cookies_capture(self, crawled_tree: CrawledTree) -> None: pipeline = self.redis.pipeline() - already_loaded: Set[Tuple[str, str]] = set() - already_cleaned_up: Set[str] = set() + already_loaded: set[tuple[str, str]] = set() + already_cleaned_up: set[str] = set() for urlnode in crawled_tree.root_hartree.url_tree.traverse(): if 'cookies_received' not in urlnode.features: continue @@ -90,7 +92,7 @@ def index_cookies_capture(self, crawled_tree: CrawledTree) -> None: self.redis.sadd('indexed_cookies', crawled_tree.uuid) pipeline = self.redis.pipeline() - already_loaded: Set[Tuple[str, str]] = set() + already_loaded: set[tuple[str, str]] = set() for urlnode in crawled_tree.root_hartree.url_tree.traverse(): if 'cookies_received' not in urlnode.features: continue @@ -131,13 +133,13 @@ def aggregate_domain_cookies(self): # ###### Body hashes ###### @property - def ressources(self) -> List[Tuple[str, float]]: + def ressources(self) -> list[tuple[str, float]]: return self.redis.zrevrange('body_hashes', 0, 200, withscores=True) def ressources_number_domains(self, h: str) -> int: return self.redis.zcard(f'bh|{h}') - def body_hash_fequency(self, body_hash: str) -> Dict[str, int]: + def body_hash_fequency(self, body_hash: str) -> dict[str, int]: pipeline = self.redis.pipeline() pipeline.zscore('body_hashes', body_hash) pipeline.zcard(f'bh|{body_hash}') @@ -151,7 +153,7 @@ def body_hash_fequency(self, body_hash: str) -> Dict[str, int]: def _reindex_body_hashes_capture(self, crawled_tree: CrawledTree) -> None: # if the capture is regenerated, the hostnodes/urlnodes UUIDs are changed - cleaned_up_hashes: Set[str] = set() + cleaned_up_hashes: set[str] = set() pipeline = self.redis.pipeline() for urlnode in crawled_tree.root_hartree.url_tree.traverse(): for h in urlnode.resources_hashes: @@ -181,17 +183,17 @@ def index_body_hashes_capture(self, crawled_tree: CrawledTree) -> None: f'{urlnode.uuid}|{urlnode.hostnode_uuid}|{urlnode.name}') pipeline.execute() - def get_hash_uuids(self, body_hash: str) -> Tuple[str, str, str]: + def get_hash_uuids(self, body_hash: str) -> tuple[str, str, str]: """Use that to get a reference allowing to fetch a resource from one of the capture.""" - capture_uuid: str = self.redis.srandmember(f'bh|{body_hash}|captures') + capture_uuid = str(self.redis.srandmember(f'bh|{body_hash}|captures')) entry = self.redis.zrange(f'bh|{body_hash}|captures|{capture_uuid}', 0, 1)[0] urlnode_uuid, hostnode_uuid, url = entry.split('|', 2) return capture_uuid, urlnode_uuid, hostnode_uuid - def get_body_hash_captures(self, body_hash: str, filter_url: Optional[str]=None, - filter_capture_uuid: Optional[str]=None, + def get_body_hash_captures(self, body_hash: str, filter_url: str | None=None, + filter_capture_uuid: str | None=None, limit: int=20, - prefered_uuids: Set[str]=set()) -> Tuple[int, List[Tuple[str, str, str, bool]]]: + prefered_uuids: set[str]=set()) -> tuple[int, list[tuple[str, str, str, bool]]]: '''Get the captures matching the hash. :param filter_url: URL of the hash we're searching for @@ -199,7 +201,7 @@ def get_body_hash_captures(self, body_hash: str, filter_url: Optional[str]=None, :param limit: Max matching captures to return, -1 means unlimited. :param prefered_uuids: UUID cached right now, so we don't rebuild trees. ''' - to_return: List[Tuple[str, str, str, bool]] = [] + to_return: list[tuple[str, str, str, bool]] = [] len_captures = self.redis.scard(f'bh|{body_hash}|captures') unlimited = False if limit == -1: @@ -224,11 +226,11 @@ def get_body_hash_captures(self, body_hash: str, filter_url: Optional[str]=None, break return len_captures, to_return - def get_body_hash_domains(self, body_hash: str) -> List[Tuple[str, float]]: + def get_body_hash_domains(self, body_hash: str) -> list[tuple[str, float]]: return self.redis.zrevrange(f'bh|{body_hash}', 0, -1, withscores=True) - def get_body_hash_urls(self, body_hash: str) -> Dict[str, List[Dict[str, str]]]: - all_captures: Set[str] = self.redis.smembers(f'bh|{body_hash}|captures') + def get_body_hash_urls(self, body_hash: str) -> dict[str, list[dict[str, str]]]: + all_captures: set[str] = self.redis.smembers(f'bh|{body_hash}|captures') urls = defaultdict(list) for capture_uuid in list(all_captures): for entry in self.redis.zrevrange(f'bh|{body_hash}|captures|{capture_uuid}', 0, -1): @@ -239,19 +241,19 @@ def get_body_hash_urls(self, body_hash: str) -> Dict[str, List[Dict[str, str]]]: # ###### HTTP Headers Hashes ###### @property - def http_headers_hashes(self) -> List[Tuple[str, float]]: + def http_headers_hashes(self) -> list[tuple[str, float]]: return self.redis.zrevrange('hhhashes', 0, -1, withscores=True) def http_headers_hashes_number_captures(self, hhh: str) -> int: return self.redis.scard(f'hhhashes|{hhh}|captures') - def get_http_headers_hashes_captures(self, hhh: str) -> List[Tuple[str, str]]: + def get_http_headers_hashes_captures(self, hhh: str) -> list[tuple[str, str]]: return [uuids.split('|') for uuids in self.redis.smembers(f'hhhashes|{hhh}|captures')] def _reindex_http_headers_hashes_capture(self, crawled_tree: CrawledTree) -> None: pipeline = self.redis.pipeline() - already_loaded: Set[str] = set() - already_cleaned_up: Set[str] = set() + already_loaded: set[str] = set() + already_cleaned_up: set[str] = set() for urlnode in crawled_tree.root_hartree.url_tree.traverse(): if 'hhhash' not in urlnode.features: continue @@ -276,7 +278,7 @@ def index_http_headers_hashes_capture(self, crawled_tree: CrawledTree) -> None: self.redis.sadd('indexed_hhhashes', crawled_tree.uuid) pipeline = self.redis.pipeline() - already_loaded: Set[str] = set() + already_loaded: set[str] = set() for urlnode in crawled_tree.root_hartree.url_tree.traverse(): if 'hhhash' not in urlnode.features: continue @@ -291,11 +293,11 @@ def index_http_headers_hashes_capture(self, crawled_tree: CrawledTree) -> None: # ###### URLs and Domains ###### @property - def urls(self) -> List[Tuple[str, float]]: + def urls(self) -> list[tuple[str, float]]: return self.redis.zrevrange('urls', 0, 200, withscores=True) @property - def hostnames(self) -> List[Tuple[str, float]]: + def hostnames(self) -> list[tuple[str, float]]: return self.redis.zrevrange('hostnames', 0, 200, withscores=True) def index_url_capture(self, crawled_tree: CrawledTree) -> None: @@ -316,21 +318,21 @@ def index_url_capture(self, crawled_tree: CrawledTree) -> None: pipeline.sadd(f'urls|{md5}|captures', crawled_tree.uuid) pipeline.execute() - def get_captures_url(self, url: str) -> Set[str]: + def get_captures_url(self, url: str) -> set[str]: md5 = hashlib.md5(url.encode()).hexdigest() return self.redis.smembers(f'urls|{md5}|captures') - def get_captures_hostname(self, hostname: str) -> Set[str]: + def get_captures_hostname(self, hostname: str) -> set[str]: return self.redis.smembers(f'hostnames|{hostname}|captures') # ###### Categories ###### @property - def categories(self) -> List[Tuple[str, int]]: + def categories(self) -> list[tuple[str, int]]: return [(c, int(score)) for c, score in self.redis.zrevrange('categories', 0, 200, withscores=True)] - def index_categories_capture(self, capture_uuid: str, categories: Iterable[str]): + def index_categories_capture(self, capture_uuid: str, categories: Iterable[str]) -> None: if not categories: return if self.redis.sismember('indexed_categories', capture_uuid): @@ -345,5 +347,5 @@ def index_categories_capture(self, capture_uuid: str, categories: Iterable[str]) pipeline.sadd(category, capture_uuid) pipeline.execute() - def get_captures_category(self, category: str) -> Set[str]: + def get_captures_category(self, category: str) -> set[str]: return self.redis.smembers(category) diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index d5c6e7e0..9bc19399 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import base64 import copy import gzip @@ -22,7 +24,7 @@ from zipfile import ZipFile from defang import defang # type: ignore -from har2tree import CrawledTree, HostNode, URLNode +from har2tree import CrawledTree, HostNode, URLNode # type: ignore[attr-defined] from lacuscore import (LacusCore, CaptureStatus as CaptureStatusCore, # CaptureResponse as CaptureResponseCore) @@ -30,15 +32,15 @@ CaptureSettings as CaptureSettingsCore) from PIL import Image, UnidentifiedImageError from playwrightcapture import get_devices -from pylacus import (PyLacus, +from pylacus import (PyLacus, # type: ignore[attr-defined] CaptureStatus as CaptureStatusPy # CaptureResponse as CaptureResponsePy, # CaptureResponseJson as CaptureResponseJsonPy, # CaptureSettings as CaptureSettingsPy ) -from pymisp import MISPAttribute, MISPEvent, MISPObject -from pysecuritytxt import PySecurityTXT, SecurityTXTNotAvailable -from pylookyloomonitoring import PyLookylooMonitoring +from pymisp import MISPAttribute, MISPEvent, MISPObject # type: ignore[attr-defined] +from pysecuritytxt import PySecurityTXT, SecurityTXTNotAvailable # type: ignore[attr-defined] +from pylookyloomonitoring import PyLookylooMonitoring # type: ignore[attr-defined] from redis import ConnectionPool, Redis from redis.connection import UnixDomainSocketConnection @@ -62,13 +64,13 @@ class CaptureSettings(CaptureSettingsCore, total=False): '''The capture settings that can be passed to Lookyloo''' - listing: Optional[int] - not_queued: Optional[int] - auto_report: Optional[Union[bool, str, Dict[str, str]]] - dnt: Optional[str] - browser_name: Optional[str] - os: Optional[str] - parent: Optional[str] + listing: int | None + not_queued: int | None + auto_report: bool | str | dict[str, str] | None + dnt: str | None + browser_name: str | None + os: str | None + parent: str | None class Lookyloo(): @@ -153,13 +155,13 @@ def __init__(self) -> None: self.lacus @property - def redis(self): + def redis(self) -> Redis: # type: ignore[type-arg] return Redis(connection_pool=self.redis_pool) @cached_property - def lacus(self): + def lacus(self) -> PyLacus | LacusCore: has_remote_lacus = False - self._lacus: Union[PyLacus, LacusCore] + self._lacus: PyLacus | LacusCore if get_config('generic', 'remote_lacus'): remote_lacus_config = get_config('generic', 'remote_lacus') if remote_lacus_config.get('enable'): @@ -180,7 +182,7 @@ def lacus(self): if not has_remote_lacus: # We need a redis connector that doesn't decode. - redis: Redis = Redis(unix_socket_path=get_socket_path('cache')) + redis: Redis = Redis(unix_socket_path=get_socket_path('cache')) # type: ignore[type-arg] self._lacus = LacusCore(redis, tor_proxy=get_config('generic', 'tor_proxy'), max_capture_time=get_config('generic', 'max_capture_time'), only_global_lookups=get_config('generic', 'only_global_lookups'), @@ -188,14 +190,14 @@ def lacus(self): return self._lacus def add_context(self, capture_uuid: str, /, urlnode_uuid: str, *, ressource_hash: str, - legitimate: bool, malicious: bool, details: Dict[str, Dict[str, str]]): + legitimate: bool, malicious: bool, details: dict[str, dict[str, str]]) -> None: '''Adds context information to a capture or a URL node''' if malicious: self.context.add_malicious(ressource_hash, details['malicious']) if legitimate: self.context.add_legitimate(ressource_hash, details['legitimate']) - def add_to_legitimate(self, capture_uuid: str, /, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None): + def add_to_legitimate(self, capture_uuid: str, /, hostnode_uuid: str | None=None, urlnode_uuid: str | None=None) -> None: '''Mark a full capture as legitimate. Iterates over all the nodes and mark them all as legitimate too.''' ct = self.get_crawled_tree(capture_uuid) @@ -225,12 +227,12 @@ def get_hostnode_from_tree(self, capture_uuid: str, /, node_uuid: str) -> HostNo ct = self.get_crawled_tree(capture_uuid) return ct.root_hartree.get_host_node_by_uuid(node_uuid) - def get_statistics(self, capture_uuid: str, /) -> Dict[str, Any]: + def get_statistics(self, capture_uuid: str, /) -> dict[str, Any]: '''Get the statistics of a capture.''' ct = self.get_crawled_tree(capture_uuid) return ct.root_hartree.stats - def get_info(self, capture_uuid: str, /) -> Dict[str, Any]: + def get_info(self, capture_uuid: str, /) -> dict[str, Any]: '''Get basic information about the capture.''' cache = self.capture_cache(capture_uuid) if not cache: @@ -254,7 +256,7 @@ def get_info(self, capture_uuid: str, /) -> Dict[str, Any]: to_return['referer'] = cache.referer if cache.referer else '' return to_return - def get_meta(self, capture_uuid: str, /) -> Dict[str, str]: + def get_meta(self, capture_uuid: str, /) -> dict[str, str]: '''Get the meta informations from a capture (mostly, details about the User Agent used.)''' cache = self.capture_cache(capture_uuid) if not cache: @@ -294,7 +296,7 @@ def get_capture_settings(self, capture_uuid: str, /) -> CaptureSettings: return json.load(f) return {} - def categories_capture(self, capture_uuid: str, /) -> Dict[str, Any]: + def categories_capture(self, capture_uuid: str, /) -> dict[str, Any]: '''Get all the categories related to a capture, in MISP Taxonomies format''' categ_file = self._captures_index[capture_uuid].capture_dir / 'categories' # get existing categories if possible @@ -337,7 +339,7 @@ def uncategorize_capture(self, capture_uuid: str, /, category: str) -> None: with categ_file.open('w') as f: f.writelines(f'{t}\n' for t in current_categories) - def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: bool=False) -> Dict: + def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: bool=False) -> dict[str, Any]: '''Launch the 3rd party modules on a capture. It uses the cached result *if* the module was triggered the same day. The `force` flag re-triggers the module regardless of the cache.''' @@ -350,8 +352,8 @@ def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: self.uwhois.capture_default_trigger(ct, force=force, auto_trigger=auto_trigger) self.hashlookup.capture_default_trigger(ct, auto_trigger=auto_trigger) - to_return: Dict[str, Dict] = {'PhishingInitiative': {}, 'VirusTotal': {}, 'UrlScan': {}, - 'URLhaus': {}} + to_return: dict[str, dict[str, Any]] = {'PhishingInitiative': {}, 'VirusTotal': {}, 'UrlScan': {}, + 'URLhaus': {}} if cache := self.capture_cache(capture_uuid): to_return['PhishingInitiative'] = self.pi.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger) to_return['VirusTotal'] = self.vt.capture_default_trigger(cache, force=force, auto_trigger=auto_trigger) @@ -363,7 +365,7 @@ def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: to_return['URLhaus'] = self.urlhaus.capture_default_trigger(cache, auto_trigger=auto_trigger) return to_return - def get_modules_responses(self, capture_uuid: str, /) -> Optional[Dict[str, Any]]: + def get_modules_responses(self, capture_uuid: str, /) -> dict[str, Any] | None: '''Get the responses of the modules from the cached responses on the disk''' cache = self.capture_cache(capture_uuid) if not cache: @@ -373,7 +375,7 @@ def get_modules_responses(self, capture_uuid: str, /) -> Optional[Dict[str, Any] self.logger.warning(f'The capture {capture_uuid} does not have a URL in the cache, it is broken.') return None - to_return: Dict[str, Any] = {} + to_return: dict[str, Any] = {} if self.vt.available: to_return['vt'] = {} if hasattr(cache, 'redirects') and cache.redirects: @@ -416,7 +418,7 @@ def get_modules_responses(self, capture_uuid: str, /) -> Optional[Dict[str, Any] to_return['urlscan']['result'] = result return to_return - def get_historical_lookups(self, capture_uuid: str, /, force: bool=False) -> Dict: + def get_historical_lookups(self, capture_uuid: str, /, force: bool=False) -> dict[str, Any]: # this method is only trigered when the user wants to get more details about the capture # by looking at Passive DNS systems, check if there are hits in the current capture # in another one and things like that. The trigger_modules method is for getting @@ -425,7 +427,7 @@ def get_historical_lookups(self, capture_uuid: str, /, force: bool=False) -> Dic if not cache: self.logger.warning(f'Unable to get the modules responses unless the capture {capture_uuid} is cached') return {} - to_return: Dict[str, Any] = defaultdict(dict) + to_return: dict[str, Any] = defaultdict(dict) if self.riskiq.available: try: self.riskiq.capture_default_trigger(cache) @@ -461,7 +463,7 @@ def hide_capture(self, capture_uuid: str, /) -> None: def update_tree_cache_info(self, process_id: int, classname: str) -> None: self.redis.hset('tree_cache', f'{process_id}|{classname}', str(self._captures_index.lru_cache_status())) - def sorted_capture_cache(self, capture_uuids: Optional[Iterable[str]]=None, cached_captures_only: bool=True, index_cut_time: Optional[datetime]=None) -> List[CaptureCache]: + def sorted_capture_cache(self, capture_uuids: Iterable[str] | None=None, cached_captures_only: bool=True, index_cut_time: datetime | None=None) -> list[CaptureCache]: '''Get all the captures in the cache, sorted by timestamp (new -> old). By default, this method will only return the captures that are currently cached.''' # Make sure we do not try to load archived captures that would still be in 'lookup_dirs' @@ -489,13 +491,13 @@ def sorted_capture_cache(self, capture_uuids: Optional[Iterable[str]]=None, cach # Do not try to build pickles capture_uuids = set(capture_uuids) & self._captures_index.cached_captures - all_cache: List[CaptureCache] = [self._captures_index[uuid] for uuid in capture_uuids + all_cache: list[CaptureCache] = [self._captures_index[uuid] for uuid in capture_uuids if self.capture_cache(uuid) and hasattr(self._captures_index[uuid], 'timestamp')] all_cache.sort(key=operator.attrgetter('timestamp'), reverse=True) return all_cache - def get_capture_status(self, capture_uuid: str, /) -> Union[CaptureStatusCore, CaptureStatusPy]: + def get_capture_status(self, capture_uuid: str, /) -> CaptureStatusCore | CaptureStatusPy: '''Returns the status (queued, ongoing, done, or UUID unknown)''' if self.redis.hexists('lookup_dirs', capture_uuid): return CaptureStatusCore.DONE @@ -520,7 +522,7 @@ def get_capture_status(self, capture_uuid: str, /) -> Union[CaptureStatusCore, C return CaptureStatusCore.ONGOING return lacus_status - def capture_cache(self, capture_uuid: str, /, *, force_update: bool = False) -> Optional[CaptureCache]: + def capture_cache(self, capture_uuid: str, /, *, force_update: bool = False) -> CaptureCache | None: """Get the cache from redis, rebuild the tree if the internal UUID changed => slow""" try: cache = self._captures_index[capture_uuid] @@ -598,7 +600,7 @@ def _prepare_lacus_query(self, query: CaptureSettings) -> CaptureSettings: query['user_agent'] = user_agent if user_agent else self.user_agents.default['useragent'] # NOTE: the document must be base64 encoded - document: Optional[Union[str, bytes]] = query.pop('document', None) + document: str | bytes | None = query.pop('document', None) if document: if isinstance(document, bytes): query['document'] = base64.b64encode(document).decode() @@ -631,17 +633,16 @@ def get_priority(source: str, user: str, authenticated: bool) -> int: query = self._prepare_lacus_query(query) priority = get_priority(source, user, authenticated) - query['priority'] = priority if priority < -100: # Someone is probably abusing the system with useless URLs, remove them from the index query['listing'] = 0 try: - perma_uuid = self.lacus.enqueue( + perma_uuid = self.lacus.enqueue( # type: ignore[misc] url=query.get('url', None), document_name=query.get('document_name', None), document=query.get('document', None), # depth=query.get('depth', 0), - browser=query.get('browser', None), + browser=query.get('browser', None), # type: ignore[arg-type] device_name=query.get('device_name', None), user_agent=query.get('user_agent', None), proxy=self.global_proxy if self.global_proxy else query.get('proxy', None), @@ -659,7 +660,7 @@ def get_priority(source: str, user: str, authenticated: bool) -> int: with_favicon=query.get('with_favicon', True), # force=query.get('force', False), # recapture_interval=query.get('recapture_interval', 300), - priority=query.get('priority', 0) + priority=priority ) except Exception as e: self.logger.critical(f'Unable to enqueue capture: {e}') @@ -670,7 +671,7 @@ def get_priority(source: str, user: str, authenticated: bool) -> int: and self.redis.zscore('to_capture', perma_uuid) is None): # capture ongoing # Make the settings redis compatible - mapping_capture: Dict[str, Union[bytes, float, int, str]] = {} + mapping_capture: dict[str, bytes | float | int | str] = {} for key, value in query.items(): if isinstance(value, bool): mapping_capture[key] = 1 if value else 0 @@ -681,15 +682,15 @@ def get_priority(source: str, user: str, authenticated: bool) -> int: mapping_capture[key] = value # type: ignore p = self.redis.pipeline() - p.zadd('to_capture', {perma_uuid: query['priority']}) - p.hset(perma_uuid, mapping=mapping_capture) + p.zadd('to_capture', {perma_uuid: priority}) + p.hset(perma_uuid, mapping=mapping_capture) # type: ignore[arg-type] p.zincrby('queues', 1, f'{source}|{authenticated}|{user}') p.set(f'{perma_uuid}_mgmt', f'{source}|{authenticated}|{user}') p.execute() return perma_uuid - def takedown_details(self, hostnode: HostNode) -> Dict[str, Any]: + def takedown_details(self, hostnode: HostNode) -> dict[str, Any]: if not self.uwhois.available: self.logger.warning('UWhois module not enabled, unable to use this method') raise LookylooException('UWhois module not enabled, unable to use this method') @@ -740,7 +741,7 @@ def takedown_details(self, hostnode: HostNode) -> Dict[str, Any]: to_return['all_emails'] = list(to_return['all_emails']) return to_return - def contacts(self, capture_uuid: str, /) -> List[Dict[str, Any]]: + def contacts(self, capture_uuid: str, /) -> list[dict[str, Any]]: capture = self.get_crawled_tree(capture_uuid) rendered_hostnode = self.get_hostnode_from_tree(capture_uuid, capture.root_hartree.rendered_node.hostnode_uuid) result = [] @@ -749,7 +750,7 @@ def contacts(self, capture_uuid: str, /) -> List[Dict[str, Any]]: result.append(self.takedown_details(rendered_hostnode)) return result - def send_mail(self, capture_uuid: str, /, email: str='', comment: Optional[str]=None) -> None: + def send_mail(self, capture_uuid: str, /, email: str='', comment: str | None=None) -> None: '''Send an email notification regarding a specific capture''' if not get_config('generic', 'enable_mail_notification'): return @@ -856,7 +857,7 @@ def get_potential_favicons(self, capture_uuid: str, /, all_favicons: Literal[Fal def get_potential_favicons(self, capture_uuid: str, /, all_favicons: Literal[True], for_datauri: Literal[False]) -> BytesIO: ... - def get_potential_favicons(self, capture_uuid: str, /, all_favicons: bool=False, for_datauri: bool=False) -> Union[BytesIO, str]: + def get_potential_favicons(self, capture_uuid: str, /, all_favicons: bool=False, for_datauri: bool=False) -> BytesIO | str: '''Get rendered HTML''' fav = self._get_raw(capture_uuid, 'potential_favicons.ico', all_favicons) if not all_favicons and for_datauri: @@ -867,7 +868,7 @@ def get_html(self, capture_uuid: str, /, all_html: bool=False) -> BytesIO: '''Get rendered HTML''' return self._get_raw(capture_uuid, 'html', all_html) - def get_data(self, capture_uuid: str, /) -> Tuple[str, BytesIO]: + def get_data(self, capture_uuid: str, /) -> tuple[str, BytesIO]: '''Get the data''' return self._get_raw(capture_uuid, 'data.filename', False).getvalue().decode(), self._get_raw(capture_uuid, 'data', False) @@ -879,7 +880,7 @@ def get_screenshot(self, capture_uuid: str, /) -> BytesIO: '''Get the screenshot(s) of the rendered page''' return self._get_raw(capture_uuid, 'png', all_files=False) - def get_screenshot_thumbnail(self, capture_uuid: str, /, for_datauri: bool=False, width: int=64) -> Union[str, BytesIO]: + def get_screenshot_thumbnail(self, capture_uuid: str, /, for_datauri: bool=False, width: int=64) -> str | BytesIO: '''Get the thumbnail of the rendered page. Always crop to a square.''' to_return = BytesIO() size = width, width @@ -921,12 +922,12 @@ def get_capture(self, capture_uuid: str, /) -> BytesIO: '''Get all the files related to this capture.''' return self._get_raw(capture_uuid) - def get_urls_rendered_page(self, capture_uuid: str, /) -> List[str]: + def get_urls_rendered_page(self, capture_uuid: str, /) -> list[str]: ct = self.get_crawled_tree(capture_uuid) return sorted(set(ct.root_hartree.rendered_node.urls_in_rendered_page) - set(ct.root_hartree.all_url_requests.keys())) - def get_body_hash_investigator(self, body_hash: str, /) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float]]]: + def get_body_hash_investigator(self, body_hash: str, /) -> tuple[list[tuple[str, str]], list[tuple[str, float]]]: '''Returns all the captures related to a hash (sha512), used in the web interface.''' total_captures, details = self.indexing.get_body_hash_captures(body_hash, limit=-1) cached_captures = self.sorted_capture_cache([d[0] for d in details]) @@ -934,7 +935,7 @@ def get_body_hash_investigator(self, body_hash: str, /) -> Tuple[List[Tuple[str, domains = self.indexing.get_body_hash_domains(body_hash) return captures, domains - def get_body_hash_full(self, body_hash: str, /) -> Tuple[Dict[str, List[Dict[str, str]]], BytesIO]: + def get_body_hash_full(self, body_hash: str, /) -> tuple[dict[str, list[dict[str, str]]], BytesIO]: '''Returns a lot of information about the hash (sha512) and the hits in the instance. Also contains the data (base64 encoded)''' details = self.indexing.get_body_hash_urls(body_hash) @@ -969,9 +970,9 @@ def get_body_hash_full(self, body_hash: str, /) -> Tuple[Dict[str, List[Dict[str # TODO: Couldn't find the file anywhere. Maybe return a warning in the file? return details, BytesIO() - def get_all_body_hashes(self, capture_uuid: str, /) -> Dict[str, Dict[str, Union[URLNode, int]]]: + def get_all_body_hashes(self, capture_uuid: str, /) -> dict[str, dict[str, URLNode | int]]: ct = self.get_crawled_tree(capture_uuid) - to_return: Dict[str, Dict[str, Union[URLNode, int]]] = defaultdict() + to_return: dict[str, dict[str, URLNode | int]] = defaultdict() for node in ct.root_hartree.url_tree.traverse(): if node.empty_response or node.body_hash in to_return: # If we have the same hash more than once, skip @@ -981,24 +982,24 @@ def get_all_body_hashes(self, capture_uuid: str, /) -> Dict[str, Dict[str, Union to_return[node.body_hash] = {'node': node, 'total_captures': total_captures} return to_return - def get_latest_url_capture(self, url: str, /) -> Optional[CaptureCache]: + def get_latest_url_capture(self, url: str, /) -> CaptureCache | None: '''Get the most recent capture with this URL''' captures = self.sorted_capture_cache(self.indexing.get_captures_url(url)) if captures: return captures[0] return None - def get_url_occurrences(self, url: str, /, limit: int=20, cached_captures_only: bool=True) -> List[Dict]: + def get_url_occurrences(self, url: str, /, limit: int=20, cached_captures_only: bool=True) -> list[dict[str, Any]]: '''Get the most recent captures and URL nodes where the URL has been seen.''' captures = self.sorted_capture_cache(self.indexing.get_captures_url(url), cached_captures_only=cached_captures_only) - to_return: List[Dict] = [] + to_return: list[dict[str, Any]] = [] for capture in captures[:limit]: ct = self.get_crawled_tree(capture.uuid) - to_append: Dict[str, Union[str, Dict]] = {'capture_uuid': capture.uuid, - 'start_timestamp': capture.timestamp.isoformat(), - 'title': capture.title} - urlnodes: Dict[str, Dict[str, str]] = {} + to_append: dict[str, str | dict[str, Any]] = {'capture_uuid': capture.uuid, + 'start_timestamp': capture.timestamp.isoformat(), + 'title': capture.title} + urlnodes: dict[str, dict[str, str]] = {} for urlnode in ct.root_hartree.url_tree.search_nodes(name=url): urlnodes[urlnode.uuid] = {'start_time': urlnode.start_time.isoformat(), 'hostnode_uuid': urlnode.hostnode_uuid} @@ -1008,19 +1009,20 @@ def get_url_occurrences(self, url: str, /, limit: int=20, cached_captures_only: to_return.append(to_append) return to_return - def get_hostname_occurrences(self, hostname: str, /, with_urls_occurrences: bool=False, limit: int=20, cached_captures_only: bool=True) -> List[Dict]: + def get_hostname_occurrences(self, hostname: str, /, with_urls_occurrences: bool=False, limit: int=20, cached_captures_only: bool=True) -> list[dict[str, Any]]: '''Get the most recent captures and URL nodes where the hostname has been seen.''' captures = self.sorted_capture_cache(self.indexing.get_captures_hostname(hostname), cached_captures_only=cached_captures_only) - to_return: List[Dict] = [] + to_return: list[dict[str, Any]] = [] for capture in captures[:limit]: ct = self.get_crawled_tree(capture.uuid) - to_append: Dict[str, Union[str, List, Dict]] = {'capture_uuid': capture.uuid, - 'start_timestamp': capture.timestamp.isoformat(), - 'title': capture.title} - hostnodes: List[str] = [] + to_append: dict[str, str | list[Any] | dict[str, Any]] = { + 'capture_uuid': capture.uuid, + 'start_timestamp': capture.timestamp.isoformat(), + 'title': capture.title} + hostnodes: list[str] = [] if with_urls_occurrences: - urlnodes: Dict[str, Dict[str, str]] = {} + urlnodes: dict[str, dict[str, str]] = {} for hostnode in ct.root_hartree.hostname_tree.search_nodes(name=hostname): hostnodes.append(hostnode.uuid) if with_urls_occurrences: @@ -1036,7 +1038,7 @@ def get_hostname_occurrences(self, hostname: str, /, with_urls_occurrences: bool to_return.append(to_append) return to_return - def get_cookie_name_investigator(self, cookie_name: str, /) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float, List[Tuple[str, float]]]]]: + def get_cookie_name_investigator(self, cookie_name: str, /) -> tuple[list[tuple[str, str]], list[tuple[str, float, list[tuple[str, float]]]]]: '''Returns all the captures related to a cookie name entry, used in the web interface.''' cached_captures = self.sorted_capture_cache([entry[0] for entry in self.indexing.get_cookies_names_captures(cookie_name)]) captures = [(cache.uuid, cache.title) for cache in cached_captures] @@ -1044,7 +1046,7 @@ def get_cookie_name_investigator(self, cookie_name: str, /) -> Tuple[List[Tuple[ for domain, freq in self.indexing.get_cookie_domains(cookie_name)] return captures, domains - def get_hhh_investigator(self, hhh: str, /) -> Tuple[List[Tuple[str, str, str, str]], List[Tuple[str, str]]]: + def get_hhh_investigator(self, hhh: str, /) -> tuple[list[tuple[str, str, str, str]], list[tuple[str, str]]]: '''Returns all the captures related to a cookie name entry, used in the web interface.''' all_captures = dict(self.indexing.get_http_headers_hashes_captures(hhh)) if cached_captures := self.sorted_capture_cache([entry for entry in all_captures]): @@ -1063,11 +1065,11 @@ def get_hhh_investigator(self, hhh: str, /) -> Tuple[List[Tuple[str, str, str, s return captures, headers return [], [] - def hash_lookup(self, blob_hash: str, url: str, capture_uuid: str) -> Tuple[int, Dict[str, List[Tuple[str, str, str, str, str]]]]: + def hash_lookup(self, blob_hash: str, url: str, capture_uuid: str) -> tuple[int, dict[str, list[tuple[str, str, str, str, str]]]]: '''Search all the captures a specific hash was seen. If a URL is given, it splits the results if the hash is seen on the same URL or an other one. Capture UUID avoids duplicates on the same capture''' - captures_list: Dict[str, List[Tuple[str, str, str, str, str]]] = {'same_url': [], 'different_url': []} + captures_list: dict[str, list[tuple[str, str, str, str, str]]] = {'same_url': [], 'different_url': []} total_captures, details = self.indexing.get_body_hash_captures(blob_hash, url, filter_capture_uuid=capture_uuid, limit=-1, prefered_uuids=set(self._captures_index.keys())) for h_capture_uuid, url_uuid, url_hostname, same_url in details: @@ -1082,7 +1084,7 @@ def hash_lookup(self, blob_hash: str, url: str, capture_uuid: str) -> Tuple[int, captures_list['different_url'].sort(key=lambda y: y[3]) return total_captures, captures_list - def get_ressource(self, tree_uuid: str, /, urlnode_uuid: str, h: Optional[str]) -> Optional[Tuple[str, BytesIO, str]]: + def get_ressource(self, tree_uuid: str, /, urlnode_uuid: str, h: str | None) -> tuple[str, BytesIO, str] | None: '''Get a specific resource from a URL node. If a hash s also given, we want an embeded resource''' try: url = self.get_urlnode_from_tree(tree_uuid, urlnode_uuid) @@ -1108,7 +1110,7 @@ def get_ressource(self, tree_uuid: str, /, urlnode_uuid: str, h: Optional[str]) return 'embedded_ressource.bin', BytesIO(blob.getvalue()), mimetype return None - def __misp_add_vt_to_URLObject(self, obj: MISPObject) -> Optional[MISPObject]: + def __misp_add_vt_to_URLObject(self, obj: MISPObject) -> MISPObject | None: urls = obj.get_attributes_by_relation('url') if not urls: return None @@ -1124,7 +1126,7 @@ def __misp_add_vt_to_URLObject(self, obj: MISPObject) -> Optional[MISPObject]: obj.add_reference(vt_obj, 'analysed-with') return vt_obj - def __misp_add_urlscan_to_event(self, capture_uuid: str, visibility: str) -> Optional[MISPAttribute]: + def __misp_add_urlscan_to_event(self, capture_uuid: str, visibility: str) -> MISPAttribute | None: if cache := self.capture_cache(capture_uuid): response = self.urlscan.url_submit(cache, visibility) if 'result' in response: @@ -1134,7 +1136,7 @@ def __misp_add_urlscan_to_event(self, capture_uuid: str, visibility: str) -> Opt return attribute return None - def misp_export(self, capture_uuid: str, /, with_parent: bool=False) -> Union[List[MISPEvent], Dict[str, str]]: + def misp_export(self, capture_uuid: str, /, with_parent: bool=False) -> list[MISPEvent] | dict[str, str]: '''Export a capture in MISP format. You can POST the return of this method directly to a MISP instance and it will create an event.''' cache = self.capture_cache(capture_uuid) @@ -1200,7 +1202,7 @@ def misp_export(self, capture_uuid: str, /, with_parent: bool=False) -> Union[Li return [event] - def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: Optional[str]=None) -> Optional[Tuple[Dict[str, Set[str]], str]]: + def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: str | None=None) -> tuple[dict[str, set[str]], str] | None: if instance_name is None: misp = self.misps.default_misp elif self.misps.get(instance_name) is not None: @@ -1217,7 +1219,7 @@ def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: Optional[ self.logger.warning(f'Unable to get the modules responses unless the tree ({capture_uuid}) is cached.') return None nodes_to_lookup = ct.root_hartree.rendered_node.get_ancestors() + [ct.root_hartree.rendered_node] - to_return: Dict[str, Set[str]] = defaultdict(set) + to_return: dict[str, set[str]] = defaultdict(set) for node in nodes_to_lookup: hits = misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid)) for event_id, values in hits.items(): @@ -1226,7 +1228,7 @@ def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: Optional[ to_return[event_id].update(values) return to_return, misp.client.root_url - def get_hashes_with_context(self, tree_uuid: str, /, algorithm: str, *, urls_only: bool=False) -> Union[Dict[str, Set[str]], Dict[str, List[URLNode]]]: + def get_hashes_with_context(self, tree_uuid: str, /, algorithm: str, *, urls_only: bool=False) -> dict[str, set[str]] | dict[str, list[URLNode]]: """Build (on demand) hashes for all the ressources of the tree, using the alorighm provided by the user. If you just want the hashes in SHA512, use the get_hashes method, it gives you a list of hashes an they're build with the tree. This method is computing the hashes when you query it, so it is slower.""" @@ -1236,7 +1238,7 @@ def get_hashes_with_context(self, tree_uuid: str, /, algorithm: str, *, urls_onl return {h: {node.name for node in nodes} for h, nodes in hashes.items()} return hashes - def merge_hashlookup_tree(self, tree_uuid: str, /) -> Tuple[Dict[str, Dict[str, Any]], int]: + def merge_hashlookup_tree(self, tree_uuid: str, /) -> tuple[dict[str, dict[str, Any]], int]: if not self.hashlookup.available: raise LookylooException('Hashlookup module not enabled.') hashes_tree = self.get_hashes_with_context(tree_uuid, algorithm='sha1') @@ -1253,20 +1255,20 @@ def merge_hashlookup_tree(self, tree_uuid: str, /) -> Tuple[Dict[str, Dict[str, with hashlookup_file.open() as f: hashlookup_entries = json.load(f) - to_return: Dict[str, Dict[str, Any]] = defaultdict(dict) + to_return: dict[str, dict[str, Any]] = defaultdict(dict) for sha1 in hashlookup_entries.keys(): to_return[sha1]['nodes'] = hashes_tree[sha1] to_return[sha1]['hashlookup'] = hashlookup_entries[sha1] return to_return, len(hashes_tree) - def get_hashes(self, tree_uuid: str, /, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> Set[str]: + def get_hashes(self, tree_uuid: str, /, hostnode_uuid: str | None=None, urlnode_uuid: str | None=None) -> set[str]: """Return hashes (sha512) of resources. Only tree_uuid: All the hashes tree_uuid and hostnode_uuid: hashes of all the resources in that hostnode (including embedded ressources) tree_uuid, hostnode_uuid, and urlnode_uuid: hash of the URL node body, and embedded resources """ - container: Union[CrawledTree, HostNode, URLNode] + container: CrawledTree | HostNode | URLNode if urlnode_uuid: container = self.get_urlnode_from_tree(tree_uuid, urlnode_uuid) elif hostnode_uuid: @@ -1275,7 +1277,7 @@ def get_hashes(self, tree_uuid: str, /, hostnode_uuid: Optional[str]=None, urlno container = self.get_crawled_tree(tree_uuid) return get_resources_hashes(container) - def get_hostnames(self, tree_uuid: str, /, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> Set[str]: + def get_hostnames(self, tree_uuid: str, /, hostnode_uuid: str | None=None, urlnode_uuid: str | None=None) -> set[str]: """Return all the unique hostnames: * of a complete tree if no hostnode_uuid and urlnode_uuid are given * of a HostNode if hostnode_uuid is given @@ -1291,7 +1293,7 @@ def get_hostnames(self, tree_uuid: str, /, hostnode_uuid: Optional[str]=None, ur ct = self.get_crawled_tree(tree_uuid) return {node.name for node in ct.root_hartree.hostname_tree.traverse()} - def get_urls(self, tree_uuid: str, /, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> Set[str]: + def get_urls(self, tree_uuid: str, /, hostnode_uuid: str | None=None, urlnode_uuid: str | None=None) -> set[str]: """Return all the unique URLs: * of a complete tree if no hostnode_uuid and urlnode_uuid are given * of a HostNode if hostnode_uuid is given @@ -1307,18 +1309,18 @@ def get_urls(self, tree_uuid: str, /, hostnode_uuid: Optional[str]=None, urlnode ct = self.get_crawled_tree(tree_uuid) return {node.name for node in ct.root_hartree.url_tree.traverse()} - def get_playwright_devices(self) -> Dict: + def get_playwright_devices(self) -> dict[str, Any]: """Get the preconfigured devices from Playwright""" return get_devices() - def get_hostnode_investigator(self, capture_uuid: str, /, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]: + def get_hostnode_investigator(self, capture_uuid: str, /, node_uuid: str) -> tuple[HostNode, list[dict[str, Any]]]: '''Gather all the informations needed to display the Hostnode investigator popup.''' - def normalize_known_content(h: str, /, known_content: Dict[str, Any], url: URLNode) -> Tuple[Optional[Union[str, List[Any]]], Optional[Tuple[bool, Any]]]: + def normalize_known_content(h: str, /, known_content: dict[str, Any], url: URLNode) -> tuple[str | list[Any] | None, tuple[bool, Any] | None]: ''' There are a few different sources to figure out known vs. legitimate content, this method normalize it for the web interface.''' - known: Optional[Union[str, List[Any]]] = None - legitimate: Optional[Tuple[bool, Any]] = None + known: str | list[Any] | None = None + legitimate: tuple[bool, Any] | None = None if h not in known_content: return known, legitimate @@ -1340,13 +1342,13 @@ def normalize_known_content(h: str, /, known_content: Dict[str, Any], url: URLNo known_content = self.context.find_known_content(hostnode) self.uwhois.query_whois_hostnode(hostnode) - urls: List[Dict[str, Any]] = [] + urls: list[dict[str, Any]] = [] for url in hostnode.urls: # For the popup, we need: # * https vs http # * everything after the domain # * the full URL - to_append: Dict[str, Any] = { + to_append: dict[str, Any] = { 'encrypted': url.name.startswith('https'), 'url_path': url.name.split('/', 3)[-1], 'url_object': url, @@ -1389,7 +1391,7 @@ def normalize_known_content(h: str, /, known_content: Dict[str, Any], url: URLNo # Optional: Cookies sent to server in request -> map to nodes who set the cookie in response if hasattr(url, 'cookies_sent'): - to_display_sent: Dict[str, Set[Iterable[Optional[str]]]] = defaultdict(set) + to_display_sent: dict[str, set[Iterable[str | None]]] = defaultdict(set) for cookie, contexts in url.cookies_sent.items(): if not contexts: # Locally created? @@ -1401,7 +1403,7 @@ def normalize_known_content(h: str, /, known_content: Dict[str, Any], url: URLNo # Optional: Cookies received from server in response -> map to nodes who send the cookie in request if hasattr(url, 'cookies_received'): - to_display_received: Dict[str, Dict[str, Set[Iterable[Optional[str]]]]] = {'3rd_party': defaultdict(set), 'sent': defaultdict(set), 'not_sent': defaultdict(set)} + to_display_received: dict[str, dict[str, set[Iterable[str | None]]]] = {'3rd_party': defaultdict(set), 'sent': defaultdict(set), 'not_sent': defaultdict(set)} for domain, c_received, is_3rd_party in url.cookies_received: if c_received not in ct.root_hartree.cookies_sent: # This cookie is never sent. @@ -1421,14 +1423,14 @@ def normalize_known_content(h: str, /, known_content: Dict[str, Any], url: URLNo urls.append(to_append) return hostnode, urls - def get_stats(self) -> Dict[str, List]: + def get_stats(self) -> dict[str, list[Any]]: '''Gather statistics about the lookyloo instance''' today = date.today() calendar_week = today.isocalendar()[1] stats_dict = {'submissions': 0, 'redirects': 0} - stats: Dict[int, Dict[int, Dict[str, Any]]] = {} - weeks_stats: Dict[int, Dict] = {} + stats: dict[int, dict[int, dict[str, Any]]] = {} + weeks_stats: dict[int, dict[str, Any]] = {} # Only recent captures that are not archived for cache in self.sorted_capture_cache(): @@ -1467,7 +1469,7 @@ def get_stats(self) -> Dict[str, List]: stats[capture_ts.year][capture_ts.month] = {'submissions': 0} stats[capture_ts.year][capture_ts.month]['submissions'] += 1 - statistics: Dict[str, List] = {'weeks': [], 'years': []} + statistics: dict[str, list[Any]] = {'weeks': [], 'years': []} for week_number in sorted(weeks_stats.keys()): week_stat = weeks_stats[week_number] urls = week_stat.pop('uniq_urls') @@ -1477,7 +1479,7 @@ def get_stats(self) -> Dict[str, List]: statistics['weeks'].append(week_stat) for year in sorted(stats.keys()): - year_stats: Dict[str, Union[int, List]] = {'year': year, 'months': [], 'yearly_submissions': 0} + year_stats: dict[str, int | list[Any]] = {'year': year, 'months': [], 'yearly_submissions': 0} for month in sorted(stats[year].keys()): month_stats = stats[year][month] if len(month_stats) == 1: @@ -1496,15 +1498,15 @@ def get_stats(self) -> Dict[str, List]: return statistics def store_capture(self, uuid: str, is_public: bool, - os: Optional[str]=None, browser: Optional[str]=None, - parent: Optional[str]=None, - downloaded_filename: Optional[str]=None, downloaded_file: Optional[bytes]=None, - error: Optional[str]=None, har: Optional[Dict[str, Any]]=None, - png: Optional[bytes]=None, html: Optional[str]=None, - last_redirected_url: Optional[str]=None, - cookies: Optional[Union[List['Cookie'], List[Dict[str, str]]]]=None, - capture_settings: Optional[CaptureSettings]=None, - potential_favicons: Optional[Set[bytes]]=None + os: str | None=None, browser: str | None=None, + parent: str | None=None, + downloaded_filename: str | None=None, downloaded_file: bytes | None=None, + error: str | None=None, har: dict[str, Any] | None=None, + png: bytes | None=None, html: str | None=None, + last_redirected_url: str | None=None, + cookies: list[Cookie] | list[dict[str, str]] | None=None, + capture_settings: CaptureSettings | None=None, + potential_favicons: set[bytes] | None=None ) -> None: now = datetime.now() @@ -1512,7 +1514,7 @@ def store_capture(self, uuid: str, is_public: bool, safe_create_dir(dirpath) if os or browser: - meta: Dict[str, str] = {} + meta: dict[str, str] = {} if os: meta['os'] = os if browser: diff --git a/lookyloo/modules/__init__.py b/lookyloo/modules/__init__.py index 5a2c378e..0f5ef970 100644 --- a/lookyloo/modules/__init__.py +++ b/lookyloo/modules/__init__.py @@ -14,3 +14,22 @@ from .urlhaus import URLhaus # noqa from .cloudflare import Cloudflare # noqa from .circlpdns import CIRCLPDNS # noqa + +__all__ = [ + 'FOX', + 'MISPs', + 'MISP', + 'PhishingInitiative', + 'SaneJavaScript', + 'UrlScan', + 'UniversalWhois', + 'VirusTotal', + 'Pandora', + 'Phishtank', + 'Hashlookup', + 'RiskIQ', + 'RiskIQError', + 'URLhaus', + 'Cloudflare', + 'CIRCLPDNS' +] diff --git a/lookyloo/modules/circlpdns.py b/lookyloo/modules/circlpdns.py index ed55ac62..ee577b18 100644 --- a/lookyloo/modules/circlpdns.py +++ b/lookyloo/modules/circlpdns.py @@ -1,12 +1,14 @@ #!/usr/bin/env python3 +from __future__ import annotations + import json from datetime import date from typing import Dict, List, Optional, TYPE_CHECKING from urllib.parse import urlparse -from pypdns import PyPDNS, PDNSRecord +from pypdns import PyPDNS, PDNSRecord # type: ignore[attr-defined] from ..default import ConfigError, get_homedir from ..helpers import get_cache_directory @@ -32,7 +34,7 @@ def module_init(self) -> bool: self.storage_dir_pypdns.mkdir(parents=True, exist_ok=True) return True - def get_passivedns(self, query: str) -> Optional[List[PDNSRecord]]: + def get_passivedns(self, query: str) -> list[PDNSRecord] | None: # The query can be IP or Hostname. For now, we only do it on domains. url_storage_dir = get_cache_directory(self.storage_dir_pypdns, query, 'pdns') if not url_storage_dir.exists(): @@ -44,7 +46,7 @@ def get_passivedns(self, query: str) -> Optional[List[PDNSRecord]]: with cached_entries[0].open() as f: return [PDNSRecord(record) for record in json.load(f)] - def capture_default_trigger(self, cache: 'CaptureCache', /, *, force: bool=False, auto_trigger: bool=False) -> Dict: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, auto_trigger: bool=False) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if not self.available: return {'error': 'Module not available'} diff --git a/lookyloo/modules/fox.py b/lookyloo/modules/fox.py index a2f12ee3..4c0f9fdc 100644 --- a/lookyloo/modules/fox.py +++ b/lookyloo/modules/fox.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 -from typing import Dict +from __future__ import annotations + +from typing import Dict, Any import requests @@ -29,7 +31,7 @@ def module_init(self) -> bool: return True - def capture_default_trigger(self, url: str, /, auto_trigger: bool=False) -> Dict: + def capture_default_trigger(self, url: str, /, auto_trigger: bool=False) -> dict[str, str]: '''Run the module on the initial URL''' if not self.available: return {'error': 'Module not available'} @@ -52,7 +54,7 @@ def __submit_url(self, url: str, ) -> bool: response.raise_for_status() return True - def url_submit(self, url: str) -> Dict: + def url_submit(self, url: str) -> dict[str, Any]: '''Submit a URL to FOX ''' if not self.available: diff --git a/lookyloo/modules/hashlookup.py b/lookyloo/modules/hashlookup.py index 29341319..1a275685 100644 --- a/lookyloo/modules/hashlookup.py +++ b/lookyloo/modules/hashlookup.py @@ -1,10 +1,12 @@ #!/usr/bin/env python3 +from __future__ import annotations + import json from typing import Dict, List -from har2tree import CrawledTree -from pyhashlookup import Hashlookup +from har2tree import CrawledTree # type: ignore[attr-defined] +from pyhashlookup import Hashlookup # type: ignore[attr-defined] from ..default import ConfigError from ..helpers import get_useragent_for_requests @@ -31,7 +33,7 @@ def module_init(self) -> bool: self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False)) return True - def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> Dict: + def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: bool=False) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if not self.available: return {'error': 'Module not available'} @@ -52,14 +54,14 @@ def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, auto_trigger: return {'success': 'Module triggered'} - def hashes_lookup(self, hashes: List[str]) -> Dict[str, Dict[str, str]]: + def hashes_lookup(self, hashes: list[str]) -> dict[str, dict[str, str]]: '''Lookup a list of hashes against Hashlookup Note: It will trigger a request to hashlookup every time *until* there is a hit, then once a day. ''' if not self.available: raise ConfigError('Hashlookup not available, probably not enabled.') - to_return: Dict[str, Dict[str, str]] = {} + to_return: dict[str, dict[str, str]] = {} for entry in self.client.sha1_bulk_lookup(hashes): if 'SHA-1' in entry: to_return[entry['SHA-1'].lower()] = entry diff --git a/lookyloo/modules/misp.py b/lookyloo/modules/misp.py index 279e534d..c6b592e6 100644 --- a/lookyloo/modules/misp.py +++ b/lookyloo/modules/misp.py @@ -5,12 +5,12 @@ from io import BytesIO from collections import defaultdict from collections.abc import Mapping -from typing import Any, Dict, List, Optional, Set, Union, TYPE_CHECKING +from typing import Any, Dict, List, Optional, Set, Union, TYPE_CHECKING, Iterator import requests -from har2tree import HostNode, URLNode, Har2TreeError -from pymisp import MISPAttribute, MISPEvent, PyMISP -from pymisp.tools import FileObject, URLObject +from har2tree import HostNode, URLNode, Har2TreeError # type: ignore[attr-defined] +from pymisp import MISPAttribute, MISPEvent, PyMISP, MISPTag # type: ignore[attr-defined] +from pymisp.tools import FileObject, URLObject # type: ignore[attr-defined] from ..default import get_config, get_homedir from ..helpers import get_public_suffix_list @@ -21,7 +21,7 @@ from ..capturecache import CaptureCache -class MISPs(Mapping, AbstractModule): +class MISPs(Mapping, AbstractModule): # type: ignore[type-arg] def module_init(self) -> bool: if not self.config.get('default'): @@ -37,7 +37,7 @@ def module_init(self) -> bool: self.logger.warning(f"The default MISP instance ({self.default_instance}) is missing in the instances ({', '.join(self.config['instances'].keys())}), disabling MISP.") return False - self.__misps: Dict[str, 'MISP'] = {} + self.__misps = {} for instance_name, instance_config in self.config['instances'].items(): if misp_connector := MISP(config=instance_config): if misp_connector.available: @@ -56,10 +56,10 @@ def module_init(self) -> bool: def __getitem__(self, name: str) -> 'MISP': return self.__misps[name] - def __iter__(self): + def __iter__(self) -> Iterator[dict[str, 'MISP']]: return iter(self.__misps) - def __len__(self): + def __len__(self) -> int: return len(self.__misps) @property @@ -170,10 +170,10 @@ def module_init(self) -> bool: self.psl = get_public_suffix_list() return True - def get_fav_tags(self): + def get_fav_tags(self) -> dict[Any, Any] | list[MISPTag]: return self.client.tags(pythonify=True, favouritesOnly=1) - def _prepare_push(self, to_push: Union[List[MISPEvent], MISPEvent], allow_duplicates: bool=False, auto_publish: Optional[bool]=False) -> Union[List[MISPEvent], Dict]: + def _prepare_push(self, to_push: Union[List[MISPEvent], MISPEvent], allow_duplicates: bool=False, auto_publish: Optional[bool]=False) -> Union[List[MISPEvent], Dict[str, str]]: '''Adds the pre-configured information as required by the instance. If duplicates aren't allowed, they will be automatically skiped and the extends_uuid key in the next element in the list updated''' @@ -196,11 +196,11 @@ def _prepare_push(self, to_push: Union[List[MISPEvent], MISPEvent], allow_duplic for tag in self.default_tags: event.add_tag(tag) if auto_publish: - event.publish() + event.publish() # type: ignore[no-untyped-call] events_to_push.append(event) return events_to_push - def push(self, to_push: Union[List[MISPEvent], MISPEvent], allow_duplicates: bool=False, auto_publish: Optional[bool]=None) -> Union[List[MISPEvent], Dict]: + def push(self, to_push: Union[List[MISPEvent], MISPEvent], allow_duplicates: bool=False, auto_publish: Optional[bool]=None) -> Union[List[MISPEvent], Dict[Any, Any]]: if auto_publish is None: auto_publish = self.auto_publish if self.available and self.enable_push: diff --git a/lookyloo/modules/pandora.py b/lookyloo/modules/pandora.py index c25c946c..4769c25d 100644 --- a/lookyloo/modules/pandora.py +++ b/lookyloo/modules/pandora.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 +from __future__ import annotations + from io import BytesIO -from typing import Dict +from typing import Dict, Any -from pypandora import PyPandora +from pypandora import PyPandora # type: ignore[attr-defined] from ..default import ConfigError from ..helpers import get_useragent_for_requests @@ -27,7 +29,7 @@ def module_init(self) -> bool: return True - def capture_default_trigger(self, file_in_memory: BytesIO, filename: str, /, auto_trigger: bool=False) -> Dict: + def capture_default_trigger(self, file_in_memory: BytesIO, filename: str, /, auto_trigger: bool=False) -> dict[str, str]: '''Automatically submit the file if the landing URL is a file instead of a webpage''' if not self.available: return {'error': 'Module not available'} @@ -39,7 +41,7 @@ def capture_default_trigger(self, file_in_memory: BytesIO, filename: str, /, aut self.submit_file(file_in_memory, filename) return {'success': 'Module triggered'} - def submit_file(self, file_in_memory: BytesIO, filename: str) -> Dict: + def submit_file(self, file_in_memory: BytesIO, filename: str) -> dict[str, Any]: '''Submit a file to Pandora''' if not self.available: raise ConfigError('Pandora not available, probably not able to reach the server.') diff --git a/lookyloo/modules/phishtank.py b/lookyloo/modules/phishtank.py index 393a5812..2149527a 100644 --- a/lookyloo/modules/phishtank.py +++ b/lookyloo/modules/phishtank.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 +from __future__ import annotations + import json from datetime import date, datetime, timedelta, timezone from typing import Any, Dict, Optional, List, TYPE_CHECKING -from pyphishtanklookup import PhishtankLookup +from pyphishtanklookup import PhishtankLookup # type: ignore[attr-defined] from ..default import ConfigError, get_homedir from ..helpers import get_cache_directory @@ -38,7 +40,7 @@ def module_init(self) -> bool: self.storage_dir_pt.mkdir(parents=True, exist_ok=True) return True - def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: + def get_url_lookup(self, url: str) -> dict[str, Any] | None: url_storage_dir = get_cache_directory(self.storage_dir_pt, url, 'url') if not url_storage_dir.exists(): return None @@ -49,10 +51,10 @@ def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: with cached_entries[0].open() as f: return json.load(f) - def lookup_ips_capture(self, cache: 'CaptureCache') -> Dict[str, List[Dict[str, Any]]]: + def lookup_ips_capture(self, cache: CaptureCache) -> dict[str, list[dict[str, Any]]]: with (cache.capture_dir / 'ips.json').open() as f: ips_dump = json.load(f) - to_return: Dict[str, List[Dict[str, Any]]] = {} + to_return: dict[str, list[dict[str, Any]]] = {} for ip in {ip for ips_list in ips_dump.values() for ip in ips_list}: entry = self.get_ip_lookup(ip) if not entry: @@ -64,7 +66,7 @@ def lookup_ips_capture(self, cache: 'CaptureCache') -> Dict[str, List[Dict[str, to_return[ip].append(entry) return to_return - def get_ip_lookup(self, ip: str) -> Optional[Dict[str, Any]]: + def get_ip_lookup(self, ip: str) -> dict[str, Any] | None: ip_storage_dir = get_cache_directory(self.storage_dir_pt, ip, 'ip') if not ip_storage_dir.exists(): return None @@ -75,7 +77,7 @@ def get_ip_lookup(self, ip: str) -> Optional[Dict[str, Any]]: with cached_entries[0].open() as f: return json.load(f) - def capture_default_trigger(self, cache: 'CaptureCache', /, *, auto_trigger: bool=False) -> Dict: + def capture_default_trigger(self, cache: CaptureCache, /, *, auto_trigger: bool=False) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if not self.available: return {'error': 'Module not available'} diff --git a/lookyloo/modules/pi.py b/lookyloo/modules/pi.py index 5cd7b70d..fccca3ae 100644 --- a/lookyloo/modules/pi.py +++ b/lookyloo/modules/pi.py @@ -1,12 +1,14 @@ #!/usr/bin/env python3 +from __future__ import annotations + import json import time from datetime import date from typing import Any, Dict, Optional, TYPE_CHECKING -from pyeupi import PyEUPI +from pyeupi import PyEUPI # type: ignore[attr-defined] from ..default import ConfigError, get_homedir from ..helpers import get_cache_directory @@ -34,7 +36,7 @@ def module_init(self) -> bool: self.storage_dir_eupi.mkdir(parents=True, exist_ok=True) return True - def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: + def get_url_lookup(self, url: str) -> dict[str, Any] | None: url_storage_dir = get_cache_directory(self.storage_dir_eupi, url) if not url_storage_dir.exists(): return None @@ -45,7 +47,7 @@ def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: with cached_entries[0].open() as f: return json.load(f) - def capture_default_trigger(self, cache: 'CaptureCache', /, *, force: bool=False, auto_trigger: bool=False) -> Dict: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, auto_trigger: bool=False) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if not self.available: return {'error': 'Module not available'} diff --git a/lookyloo/modules/riskiq.py b/lookyloo/modules/riskiq.py index 67938d96..9e227adb 100644 --- a/lookyloo/modules/riskiq.py +++ b/lookyloo/modules/riskiq.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import json from datetime import date, datetime, timedelta @@ -56,7 +58,7 @@ def module_init(self) -> bool: self.storage_dir_riskiq.mkdir(parents=True, exist_ok=True) return True - def get_passivedns(self, query: str) -> Optional[Dict[str, Any]]: + def get_passivedns(self, query: str) -> dict[str, Any] | None: # The query can be IP or Hostname. For now, we only do it on domains. url_storage_dir = get_cache_directory(self.storage_dir_riskiq, query, 'pdns') if not url_storage_dir.exists(): @@ -68,7 +70,7 @@ def get_passivedns(self, query: str) -> Optional[Dict[str, Any]]: with cached_entries[0].open() as f: return json.load(f) - def capture_default_trigger(self, cache: 'CaptureCache', /, *, force: bool=False, auto_trigger: bool=False) -> Dict: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, auto_trigger: bool=False) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if not self.available: return {'error': 'Module not available'} @@ -88,7 +90,7 @@ def capture_default_trigger(self, cache: 'CaptureCache', /, *, force: bool=False self.pdns_lookup(hostname, force) return {'success': 'Module triggered'} - def pdns_lookup(self, hostname: str, force: bool=False, first_seen: Optional[Union[date, datetime]]=None) -> None: + def pdns_lookup(self, hostname: str, force: bool=False, first_seen: date | datetime | None=None) -> None: '''Lookup an hostname on RiskIQ Passive DNS Note: force means re-fetch the entry RiskIQ even if we already did it today ''' diff --git a/lookyloo/modules/sanejs.py b/lookyloo/modules/sanejs.py index 46182def..dfccd623 100644 --- a/lookyloo/modules/sanejs.py +++ b/lookyloo/modules/sanejs.py @@ -1,10 +1,12 @@ #!/usr/bin/env python3 +from __future__ import annotations + import json from datetime import date from typing import Dict, Iterable, List, Union -from pysanejs import SaneJS +from pysanejs import SaneJS # type: ignore[attr-defined] from ..default import get_homedir @@ -29,7 +31,7 @@ def module_init(self) -> bool: self.storage_dir.mkdir(parents=True, exist_ok=True) return True - def hashes_lookup(self, sha512: Union[Iterable[str], str], force: bool=False) -> Dict[str, List[str]]: + def hashes_lookup(self, sha512: Iterable[str] | str, force: bool=False) -> dict[str, list[str]]: if isinstance(sha512, str): hashes: Iterable[str] = [sha512] else: @@ -43,7 +45,7 @@ def hashes_lookup(self, sha512: Union[Iterable[str], str], force: bool=False) -> with sanejs_unknowns.open() as f: unknown_hashes = {line.strip() for line in f.readlines()} - to_return: Dict[str, List[str]] = {} + to_return: dict[str, list[str]] = {} if force: to_lookup = hashes diff --git a/lookyloo/modules/urlhaus.py b/lookyloo/modules/urlhaus.py index c531ede9..04b61873 100644 --- a/lookyloo/modules/urlhaus.py +++ b/lookyloo/modules/urlhaus.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import json from datetime import date from typing import Any, Dict, Optional, TYPE_CHECKING @@ -29,7 +31,7 @@ def module_init(self) -> bool: self.storage_dir_uh.mkdir(parents=True, exist_ok=True) return True - def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: + def get_url_lookup(self, url: str) -> dict[str, Any] | None: url_storage_dir = get_cache_directory(self.storage_dir_uh, url, 'url') if not url_storage_dir.exists(): return None @@ -40,13 +42,13 @@ def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: with cached_entries[0].open() as f: return json.load(f) - def __url_result(self, url: str) -> Dict: + def __url_result(self, url: str) -> dict[str, Any]: data = {'url': url} response = requests.post(f'{self.url}/url/', data) response.raise_for_status() return response.json() - def capture_default_trigger(self, cache: 'CaptureCache', /, *, auto_trigger: bool=False) -> Dict: + def capture_default_trigger(self, cache: CaptureCache, /, *, auto_trigger: bool=False) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if not self.available: return {'error': 'Module not available'} diff --git a/lookyloo/modules/urlscan.py b/lookyloo/modules/urlscan.py index 70bcd5e7..58ca6f7a 100644 --- a/lookyloo/modules/urlscan.py +++ b/lookyloo/modules/urlscan.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import json from datetime import date from typing import Any, Dict, Optional, TYPE_CHECKING @@ -47,7 +49,7 @@ def module_init(self) -> bool: self.storage_dir_urlscan.mkdir(parents=True, exist_ok=True) return True - def get_url_submission(self, capture_info: 'CaptureCache') -> Dict[str, Any]: + def get_url_submission(self, capture_info: CaptureCache) -> dict[str, Any]: url_storage_dir = get_cache_directory( self.storage_dir_urlscan, f'{capture_info.url}{capture_info.user_agent}{capture_info.referer}', @@ -61,7 +63,7 @@ def get_url_submission(self, capture_info: 'CaptureCache') -> Dict[str, Any]: with cached_entries[0].open() as f: return json.load(f) - def capture_default_trigger(self, capture_info: 'CaptureCache', /, visibility: str, *, force: bool=False, auto_trigger: bool=False) -> Dict: + def capture_default_trigger(self, capture_info: CaptureCache, /, visibility: str, *, force: bool=False, auto_trigger: bool=False) -> dict[str, str]: '''Run the module on the initial URL''' if not self.available: return {'error': 'Module not available'} @@ -75,7 +77,7 @@ def capture_default_trigger(self, capture_info: 'CaptureCache', /, visibility: s self.url_submit(capture_info, visibility, force) return {'success': 'Module triggered'} - def __submit_url(self, url: str, useragent: Optional[str], referer: Optional[str], visibility: str) -> Dict: + def __submit_url(self, url: str, useragent: str | None, referer: str | None, visibility: str) -> dict[str, Any]: data = {'customagent': useragent if useragent else '', 'referer': referer if referer else ''} if not url.startswith('http'): @@ -96,12 +98,12 @@ def __submit_url(self, url: str, useragent: Optional[str], referer: Optional[str response.raise_for_status() return response.json() - def __url_result(self, uuid: str) -> Dict: + def __url_result(self, uuid: str) -> dict[str, Any]: response = self.client.get(f'https://urlscan.io/api/v1/result/{uuid}') response.raise_for_status() return response.json() - def url_submit(self, capture_info: 'CaptureCache', visibility: str, force: bool=False) -> Dict: + def url_submit(self, capture_info: CaptureCache, visibility: str, force: bool=False) -> dict[str, Any]: '''Lookup an URL on urlscan.io Note: force means 2 things: * (re)scan of the URL @@ -142,7 +144,7 @@ def url_submit(self, capture_info: 'CaptureCache', visibility: str, force: bool= return response return {'error': 'Submitting is not allowed by the configuration'} - def url_result(self, capture_info: 'CaptureCache'): + def url_result(self, capture_info: CaptureCache) -> dict[str, Any]: '''Get the result from a submission.''' submission = self.get_url_submission(capture_info) if submission and 'uuid' in submission: diff --git a/lookyloo/modules/uwhois.py b/lookyloo/modules/uwhois.py index d35d1d3e..cf77baba 100644 --- a/lookyloo/modules/uwhois.py +++ b/lookyloo/modules/uwhois.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 +from __future__ import annotations + import re import socket from typing import overload, Literal, List, Union -from har2tree import CrawledTree, Har2TreeError, HostNode +from har2tree import CrawledTree, Har2TreeError, HostNode # type: ignore[attr-defined] from .abstractmodule import AbstractModule @@ -62,7 +64,7 @@ def capture_default_trigger(self, crawled_tree: CrawledTree, /, *, force: bool=F self.query_whois_hostnode(n) @overload - def whois(self, query: str, contact_email_only: Literal[True]) -> List[str]: + def whois(self, query: str, contact_email_only: Literal[True]) -> list[str]: ... @overload @@ -70,10 +72,10 @@ def whois(self, query: str, contact_email_only: Literal[False]) -> str: ... @overload - def whois(self, query: str, contact_email_only: bool=False) -> Union[str, List[str]]: + def whois(self, query: str, contact_email_only: bool=False) -> str | list[str]: ... - def whois(self, query: str, contact_email_only: bool=False) -> Union[str, List[str]]: + def whois(self, query: str, contact_email_only: bool=False) -> str | list[str]: if not self.available: return '' bytes_whois = b'' diff --git a/lookyloo/modules/vt.py b/lookyloo/modules/vt.py index 457d46b1..f13ed174 100644 --- a/lookyloo/modules/vt.py +++ b/lookyloo/modules/vt.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import json import time from datetime import date @@ -18,9 +20,10 @@ from .abstractmodule import AbstractModule -def jsonify_vt(obj: WhistleBlowerDict): +def jsonify_vt(obj: WhistleBlowerDict) -> dict[str, Any]: if isinstance(obj, WhistleBlowerDict): return {k: v for k, v in obj.items()} + return obj class VirusTotal(AbstractModule): @@ -39,7 +42,7 @@ def module_init(self) -> bool: self.storage_dir_vt.mkdir(parents=True, exist_ok=True) return True - def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: + def get_url_lookup(self, url: str) -> dict[str, Any] | None: url_storage_dir = get_cache_directory(self.storage_dir_vt, vt.url_id(url)) if not url_storage_dir.exists(): return None @@ -54,7 +57,7 @@ def get_url_lookup(self, url: str) -> Optional[Dict[str, Any]]: cached_entries[0].unlink(missing_ok=True) return None - def capture_default_trigger(self, cache: 'CaptureCache', /, *, force: bool=False, auto_trigger: bool=False) -> Dict: + def capture_default_trigger(self, cache: CaptureCache, /, *, force: bool=False, auto_trigger: bool=False) -> dict[str, str]: '''Run the module on all the nodes up to the final redirect''' if not self.available: return {'error': 'Module not available'} diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 00000000..6e76e80e --- /dev/null +++ b/mypy.ini @@ -0,0 +1,8 @@ +[mypy] +strict = True +warn_return_any = False +show_error_context = True +pretty = True + +[mypy-docs.source.*] +ignore_errors = True diff --git a/poetry.lock b/poetry.lock index d5d69d6f..1c58aaf9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1447,18 +1447,18 @@ referencing = ">=0.31.0" [[package]] name = "lacuscore" -version = "1.7.8" +version = "1.7.9" description = "Core of Lacus, usable as a module" optional = false python-versions = ">=3.8,<4.0" files = [ - {file = "lacuscore-1.7.8-py3-none-any.whl", hash = "sha256:b877567a7efb35802c5fb6a01a8b88602978c16b49ee0ceead937337c6710081"}, - {file = "lacuscore-1.7.8.tar.gz", hash = "sha256:e0aa938a6555c8fe8485777e04c2ca549cd3b1fd7a75e7839d49a3fef1499252"}, + {file = "lacuscore-1.7.9-py3-none-any.whl", hash = "sha256:74309aa4216fabffadd4ab724f8f2273d12e59dedd8e826e2710847d92497f8c"}, + {file = "lacuscore-1.7.9.tar.gz", hash = "sha256:cb0df82d88ffe805fc78c60e535ee54d82842b763a84ad97cfc2a5a99d4c3ed7"}, ] [package.dependencies] defang = ">=0.5.3,<0.6.0" -playwrightcapture = {version = ">=1.22.5,<2.0.0", extras = ["recaptcha"]} +playwrightcapture = {version = ">=1.22.6,<2.0.0", extras = ["recaptcha"]} redis = {version = ">=5.0.1,<6.0.0", extras = ["hiredis"]} requests = ">=2.31.0,<3.0.0" ua-parser = ">=0.18.0,<0.19.0" @@ -2154,13 +2154,13 @@ test = ["pytest"] [[package]] name = "playwrightcapture" -version = "1.22.5" +version = "1.22.6" description = "A simple library to capture websites using playwright" optional = false python-versions = ">=3.8,<4.0" files = [ - {file = "playwrightcapture-1.22.5-py3-none-any.whl", hash = "sha256:023d394efe2c6173178ac7a9143a9b77400704b965280c494e9bb418eaa2ea86"}, - {file = "playwrightcapture-1.22.5.tar.gz", hash = "sha256:8fac3bf723536ebc6ff0e1908aa838029a8b6e8ed1998fd162d5557d1d3fb2ec"}, + {file = "playwrightcapture-1.22.6-py3-none-any.whl", hash = "sha256:910ad4dabbc51864f1c8fed6e62c2869a519211bcf7ae6e9c5aac3ea29268e33"}, + {file = "playwrightcapture-1.22.6.tar.gz", hash = "sha256:b5c377585aba9ff71f055127b6be86458503ff3308e8fc8225dd4c05ab9597ae"}, ] [package.dependencies] @@ -2173,7 +2173,7 @@ pytz = {version = ">=2023.3.post1,<2024.0", markers = "python_version < \"3.9\"" requests = {version = ">=2.31.0,<3.0.0", extras = ["socks"], optional = true, markers = "extra == \"recaptcha\""} setuptools = ">=69.0.3,<70.0.0" SpeechRecognition = {version = ">=3.10.1,<4.0.0", optional = true, markers = "extra == \"recaptcha\""} -tzdata = ">=2023.3,<2024.0" +tzdata = ">=2023.4,<2024.0" w3lib = ">=2.1.2,<3.0.0" [package.extras] @@ -3592,4 +3592,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.12" -content-hash = "9e6afc44fccf8789e1968b698fc9a6632bfb7fb5d053a404356000386d1fd3ad" +content-hash = "95ea92c4f809ea280840866efc4385f75bbb4c7ace7cb9ac4979c17df722fd02" diff --git a/pyproject.toml b/pyproject.toml index bc28ed74..d8fa844f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,7 +65,7 @@ passivetotal = "^2.5.9" werkzeug = "^3.0.1" filetype = "^1.2.0" pypandora = "^1.6.1" -lacuscore = "^1.7.8" +lacuscore = "^1.7.9" pylacus = "^1.7.1" pyipasnhistory = "^2.1.2" publicsuffixlist = "^0.10.0.20231214" @@ -103,17 +103,3 @@ types-pytz = "^2023.3.1.1" [build-system] requires = ["poetry_core"] build-backend = "poetry.core.masonry.api" - -[tool.mypy] -check_untyped_defs = true -ignore_errors = false -ignore_missing_imports = false -strict_optional = true -no_implicit_optional = true -warn_unused_ignores = true -warn_redundant_casts = true -warn_unused_configs = true -warn_unreachable = true - -show_error_context = true -pretty = true diff --git a/tools/change_captures_dir.py b/tools/change_captures_dir.py index 4143d26b..4d235816 100755 --- a/tools/change_captures_dir.py +++ b/tools/change_captures_dir.py @@ -9,7 +9,7 @@ from lookyloo.helpers import get_captures_dir -def rename_captures(): +def rename_captures() -> None: r = Redis(unix_socket_path=get_socket_path('cache')) capture_dir: Path = get_captures_dir() for uuid_path in capture_dir.glob('*/uuid'): diff --git a/tools/check_s3fs_entry.py b/tools/check_s3fs_entry.py index 3b6290fb..9749a939 100644 --- a/tools/check_s3fs_entry.py +++ b/tools/check_s3fs_entry.py @@ -9,7 +9,7 @@ from lookyloo.default import get_config -def check_path(path: str): +def check_path(path: str) -> dict[str, str]: s3fs_config = get_config('generic', 's3fs') s3fs_client = s3fs.S3FileSystem(key=s3fs_config['config']['key'], secret=s3fs_config['config']['secret'], diff --git a/tools/generate_sri.py b/tools/generate_sri.py index 19b949de..30d43de2 100755 --- a/tools/generate_sri.py +++ b/tools/generate_sri.py @@ -4,14 +4,14 @@ import hashlib import json -from typing import Dict +from typing import Dict, Any from lookyloo.default import get_homedir if __name__ == '__main__': dest_dir = get_homedir() / 'website' / 'web' - to_save: Dict = {'static': {}} + to_save: Dict[str, Any] = {'static': {}} for resource in (dest_dir / 'static').glob('*'): if resource.name[0] == '.': diff --git a/tools/manual_parse_ua_list.py b/tools/manual_parse_ua_list.py index e207d652..4446550d 100644 --- a/tools/manual_parse_ua_list.py +++ b/tools/manual_parse_ua_list.py @@ -73,7 +73,7 @@ def ua_parser(html_content: str) -> Dict[str, Any]: return to_store -def main(): +def main() -> None: to_parse = Path('Most Common User Agents - Tech Blog (wh).html') today = datetime.now() diff --git a/tools/monitoring.py b/tools/monitoring.py index 33a74bb5..25443ed2 100755 --- a/tools/monitoring.py +++ b/tools/monitoring.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 +from __future__ import annotations + import os import sys -from typing import List, Tuple +from typing import List, Tuple, Any from redis import Redis from redis.exceptions import ConnectionError @@ -21,11 +23,11 @@ class Monitoring(): def __init__(self) -> None: - self.redis_cache: Redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True) - self.redis_indexing: Redis = Redis(unix_socket_path=get_socket_path('indexing'), decode_responses=True) + self.redis_cache: Redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True) # type: ignore[type-arg] + self.redis_indexing: Redis = Redis(unix_socket_path=get_socket_path('indexing'), decode_responses=True) # type: ignore[type-arg] @property - def backend_status(self): + def backend_status(self) -> bool: socket_path_cache = get_socket_path('cache') socket_path_index = get_socket_path('indexing') backend_up = True @@ -56,12 +58,12 @@ def backend_status(self): return backend_up @property - def queues(self): + def queues(self) -> list[tuple[str, float]]: return self.redis_cache.zrevrangebyscore('queues', 'Inf', '-Inf', withscores=True) @property - def ongoing_captures(self): - captures_uuid: List[Tuple[str, float]] = self.redis_cache.zrevrangebyscore('to_capture', 'Inf', '-Inf', withscores=True) + def ongoing_captures(self) -> list[tuple[str, float, dict[str, Any]]]: + captures_uuid: list[tuple[str, float]] = self.redis_cache.zrevrangebyscore('to_capture', 'Inf', '-Inf', withscores=True) if not captures_uuid: return [] to_return = [] @@ -75,7 +77,7 @@ def ongoing_captures(self): return to_return @property - def tree_cache(self): + def tree_cache(self) -> dict[str, str]: to_return = {} for pid_name, value in self.redis_cache.hgetall('tree_cache').items(): pid, name = pid_name.split('|', 1) diff --git a/tools/rebuild_caches.py b/tools/rebuild_caches.py index 59913386..f1e6fff3 100755 --- a/tools/rebuild_caches.py +++ b/tools/rebuild_caches.py @@ -4,14 +4,14 @@ import argparse import logging -from lookyloo.lookyloo import Indexing, Lookyloo +from lookyloo import Indexing, Lookyloo from lookyloo.helpers import get_captures_dir logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s:%(message)s', level=logging.INFO) -def main(): +def main() -> None: parser = argparse.ArgumentParser(description='Rebuild the redis cache.') parser.add_argument('--rebuild_pickles', default=False, action='store_true', help='Delete and rebuild the pickles. Count 20s/pickle, it can take a very long time.') args = parser.parse_args() @@ -30,7 +30,7 @@ def main(): with index.open('r') as _f: recent_uuids = {uuid: str(index.parent / dirname) for uuid, dirname in csv.reader(_f) if (index.parent / dirname).exists()} if recent_uuids: - lookyloo.redis.hset('lookup_dirs', mapping=recent_uuids) + lookyloo.redis.hset('lookup_dirs', mapping=recent_uuids) # type: ignore[arg-type] # This call will rebuild all the caches as needed. lookyloo.sorted_capture_cache() diff --git a/tools/stats.py b/tools/stats.py index 97e9f0a6..60d8d456 100644 --- a/tools/stats.py +++ b/tools/stats.py @@ -1,8 +1,8 @@ -from lookyloo.lookyloo import Lookyloo +from lookyloo import Lookyloo import calendar import datetime from urllib.parse import urlparse -from typing import Dict, Any, Union, Set +from typing import Dict, Any, Union, Set, List lookyloo = Lookyloo() @@ -15,11 +15,12 @@ calendar_week: {'analysis': 0, 'analysis_with_redirects': 0, 'redirects': 0, 'uniq_urls': set()}} -def uniq_domains(uniq_urls): +def uniq_domains(uniq_urls: List[str]) -> Set[str]: domains = set() for url in uniq_urls: splitted = urlparse(url) - domains.add(splitted.hostname) + if splitted.hostname: + domains.add(splitted.hostname) return domains @@ -50,8 +51,8 @@ def uniq_domains(uniq_urls): print(' Number of analysis with redirects:', week_stat['analysis_with_redirects']) print(' Number of redirects:', week_stat['redirects']) print(' Number of unique URLs:', len(week_stat['uniq_urls'])) # type: ignore - domains = uniq_domains(week_stat['uniq_urls']) - print(' Number of unique domains:', len(domains)) + d = uniq_domains(week_stat['uniq_urls']) # type: ignore[arg-type] + print(' Number of unique domains:', len(d)) for year, data in stats.items(): diff --git a/tools/validate_config_files.py b/tools/validate_config_files.py index 0801a99b..ef20dc79 100755 --- a/tools/validate_config_files.py +++ b/tools/validate_config_files.py @@ -7,7 +7,7 @@ from lookyloo.default import get_homedir -def validate_generic_config_file(): +def validate_generic_config_file() -> bool: sample_config = get_homedir() / 'config' / 'generic.json.sample' with sample_config.open() as f: generic_config_sample = json.load(f) @@ -53,7 +53,7 @@ def validate_generic_config_file(): return True -def validate_modules_config_file(): +def validate_modules_config_file() -> bool: with (get_homedir() / 'config' / 'modules.json').open() as f: modules_config = json.load(f) with (get_homedir() / 'config' / 'modules.json.sample').open() as f: @@ -69,7 +69,7 @@ def validate_modules_config_file(): return True -def update_user_configs(): +def update_user_configs() -> bool: for file_name in ['generic', 'modules']: with (get_homedir() / 'config' / f'{file_name}.json').open() as f: try: diff --git a/website/web/__init__.py b/website/web/__init__.py index f1b3225e..e840069a 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from __future__ import annotations + import base64 import calendar import functools @@ -22,14 +24,15 @@ from zipfile import ZipFile import flask_login # type: ignore -from flask import (Flask, Response, flash, jsonify, redirect, render_template, +from flask import (Flask, Response, Request, flash, jsonify, redirect, render_template, request, send_file, url_for) from flask_bootstrap import Bootstrap5 # type: ignore from flask_cors import CORS # type: ignore from flask_restx import Api # type: ignore from lacuscore import CaptureStatus -from pymisp import MISPEvent, MISPServerError +from pymisp import MISPEvent, MISPServerError # type: ignore[attr-defined] from werkzeug.security import check_password_hash +from werkzeug.wrappers.response import Response as WerkzeugResponse from lookyloo.default import get_config from lookyloo.exceptions import MissingUUID, NoValidHarFile @@ -71,8 +74,8 @@ user_agents = UserAgents() -@login_manager.user_loader -def user_loader(username): +@login_manager.user_loader # type: ignore[misc] +def user_loader(username: str) -> User | None: if username not in build_users_table(): return None user = User() @@ -80,13 +83,13 @@ def user_loader(username): return user -@login_manager.request_loader -def _load_user_from_request(request): +@login_manager.request_loader # type: ignore[misc] +def _load_user_from_request(request: Request) -> User | None: return load_user_from_request(request) @app.route('/login', methods=['GET', 'POST']) -def login(): +def login() -> WerkzeugResponse | str | Response: if request.method == 'GET': return '''