Skip to content

Commit

Permalink
chg: cleanup with annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed Jan 13, 2024
1 parent a26e80b commit 8f59858
Show file tree
Hide file tree
Showing 31 changed files with 88 additions and 80 deletions.
1 change: 0 additions & 1 deletion bin/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Set

from redis import Redis
import s3fs # type: ignore
Expand Down
3 changes: 1 addition & 2 deletions bin/async_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
import signal

from pathlib import Path
from typing import Optional, Set, Union

from lacuscore import LacusCore, CaptureStatus as CaptureStatusCore, CaptureResponse as CaptureResponseCore
from pylacus import PyLacus, CaptureStatus as CaptureStatusPy, CaptureResponse as CaptureResponsePy # type: ignore[attr-defined]

from lookyloo.lookyloo import Lookyloo, CaptureSettings
from lookyloo import Lookyloo, CaptureSettings
from lookyloo.default import AbstractManager, get_config
from lookyloo.helpers import get_captures_dir

Expand Down
3 changes: 1 addition & 2 deletions bin/background_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@

from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional

from lookyloo import Lookyloo
from lookyloo.default import AbstractManager, get_config
from lookyloo.exceptions import MissingUUID, NoValidHarFile
from lookyloo.lookyloo import Lookyloo
from lookyloo.helpers import is_locked, get_sorted_captures_from_disk, make_dirs_list


Expand Down
6 changes: 4 additions & 2 deletions bin/background_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import logging.config
from collections import Counter
from datetime import date, timedelta
from typing import Any, Dict, Optional
from typing import Any

from lookyloo.lookyloo import Lookyloo, CaptureStatusCore, CaptureStatusPy # type: ignore[attr-defined]
from lacuscore import CaptureStatus as CaptureStatusCore
from lookyloo import Lookyloo
from lookyloo.default import AbstractManager, get_config, get_homedir, safe_create_dir
from lookyloo.helpers import ParsedUserAgent, serialize_to_json
from pylacus import CaptureStatus as CaptureStatusPy # type: ignore[attr-defined]

logging.config.dictConfig(get_config('logging'))

Expand Down
1 change: 0 additions & 1 deletion bin/run_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import time
from pathlib import Path
from subprocess import Popen
from typing import Optional, Dict

from redis import Redis
from redis.exceptions import ConnectionError
Expand Down
5 changes: 3 additions & 2 deletions bin/start_website.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#!/usr/bin/env python3

from __future__ import annotations

import logging
import logging.config

from subprocess import Popen
from typing import Optional

from lookyloo.default import get_config, get_homedir, AbstractManager

Expand All @@ -13,7 +14,7 @@

class Website(AbstractManager):

def __init__(self, loglevel: Optional[int]=None) -> None:
def __init__(self, loglevel: int | None=None) -> None:
super().__init__(loglevel)
self.script_name = 'website'
self.process: Popen = self._launch_website() # type: ignore[type-arg]
Expand Down
8 changes: 6 additions & 2 deletions lookyloo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging

from .lookyloo import Lookyloo # noqa
from .context import Context # noqa
from .indexing import Indexing # noqa
from .lookyloo import Lookyloo, CaptureSettings # noqa

logging.getLogger(__name__).addHandler(logging.NullHandler())

__all__ = ['Lookyloo', 'Indexing']
__all__ = ['Lookyloo',
'Indexing',
'Context',
'CaptureSettings']
2 changes: 1 addition & 1 deletion lookyloo/capturecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from functools import lru_cache, _CacheInfo as CacheInfo
from logging import Logger, LoggerAdapter
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, Set, MutableMapping, Iterator
from typing import Any, MutableMapping, Iterator

import dns.rdatatype
import dns.resolver
Expand Down
9 changes: 4 additions & 5 deletions lookyloo/comparator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import fnmatch
import logging

from typing import Dict, Any, Union, List, Optional, TypedDict, Tuple
from typing import Any, TypedDict

from har2tree import URLNode # type: ignore[attr-defined]

Expand Down Expand Up @@ -117,10 +117,9 @@ def compare_captures(self, capture_left: str, capture_right: str, /, *, settings
raise MissingUUID(f'{capture_right} does not exists.')

different: bool = False
to_return: dict[str, dict[str, (str |
list[str | dict[str, Any]] |
dict[str, (int | str |
list[int | str | dict[str, Any]])])]] = {}
to_return: dict[str, dict[str,
(str | list[str | dict[str, Any]]
| dict[str, (int | str | list[int | str | dict[str, Any]])])]] = {}
to_return['lookyloo_urls'] = {'left': f'https://{self.public_domain}/tree/{capture_left}',
'right': f'https://{self.public_domain}/tree/{capture_right}'}
left = self.get_comparables_capture(capture_left)
Expand Down
2 changes: 1 addition & 1 deletion lookyloo/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import json
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Union
from typing import Any
from urllib.parse import urlsplit

from har2tree import CrawledTree, HostNode, URLNode # type: ignore[attr-defined]
Expand Down
51 changes: 27 additions & 24 deletions lookyloo/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#!/usr/bin/env python3

from __future__ import annotations

import hashlib
import json
import logging
Expand All @@ -10,7 +13,7 @@
from importlib.metadata import version
from io import BufferedIOBase
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Union, Tuple
from typing import Any
from urllib.parse import urlparse


Expand All @@ -29,12 +32,12 @@

# This method is used in json.dump or json.dumps calls as the default parameter:
# json.dumps(..., default=dump_to_json)
def serialize_to_json(obj: Union[Set[Any]]) -> Union[List[Any]]:
def serialize_to_json(obj: set[Any]) -> list[Any]:
if isinstance(obj, set):
return sorted(obj)


def get_resources_hashes(har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Set[str]:
def get_resources_hashes(har2tree_container: CrawledTree | HostNode | URLNode) -> set[str]:
if isinstance(har2tree_container, CrawledTree):
urlnodes = har2tree_container.root_hartree.url_tree.traverse()
elif isinstance(har2tree_container, HostNode):
Expand All @@ -43,7 +46,7 @@ def get_resources_hashes(har2tree_container: Union[CrawledTree, HostNode, URLNod
urlnodes = [har2tree_container]
else:
raise LookylooException(f'har2tree_container cannot be {type(har2tree_container)}')
all_ressources_hashes: Set[str] = set()
all_ressources_hashes: set[str] = set()
for urlnode in urlnodes:
if hasattr(urlnode, 'resources_hashes'):
all_ressources_hashes.update(urlnode.resources_hashes)
Expand Down Expand Up @@ -75,7 +78,7 @@ def get_email_template() -> str:
return f.read()


def make_dirs_list(root_dir: Path) -> List[Path]:
def make_dirs_list(root_dir: Path) -> list[Path]:
directories = []
year_now = date.today().year
oldest_year = year_now - 10
Expand All @@ -99,14 +102,14 @@ def make_ts_from_dirname(dirname: str) -> datetime:


def get_sorted_captures_from_disk(captures_dir: Path, /, *,
cut_time: Optional[Union[datetime, date]]=None,
keep_more_recent: bool=True) -> List[Tuple[datetime, Path]]:
cut_time: datetime | date | None=None,
keep_more_recent: bool=True) -> list[tuple[datetime, Path]]:
'''Recursively gets all the captures present in a specific directory, doesn't use the indexes.
NOTE: this method should never be used on archived captures as it's going to take forever on S3
'''

all_paths: List[Tuple[datetime, Path]] = []
all_paths: list[tuple[datetime, Path]] = []
for entry in captures_dir.iterdir():
if not entry.is_dir():
# index file
Expand Down Expand Up @@ -173,14 +176,14 @@ def _load_playwright_devices(self) -> None:
self.most_recent_uas[platform_key][browser_key].insert(0, parsed_ua.string)

@property
def user_agents(self) -> Dict[str, Dict[str, List[str]]]:
def user_agents(self) -> dict[str, dict[str, list[str]]]:
ua_files_path = sorted(self.path.glob('**/*.json'), reverse=True)
if ua_files_path[0] != self.most_recent_ua_path:
self._load_newest_ua_file(ua_files_path[0])
return self.most_recent_uas

@property
def default(self) -> Dict[str, str]:
def default(self) -> dict[str, str]:
'''The default useragent for desktop chrome from playwright'''
parsed_ua = ParsedUserAgent(self.playwright_devices['desktop']['default']['Desktop Chrome']['user_agent'])
platform_key = parsed_ua.platform
Expand All @@ -196,16 +199,16 @@ def default(self) -> Dict[str, str]:
'useragent': parsed_ua.string}


def load_known_content(directory: str='known_content') -> Dict[str, Dict[str, Any]]:
to_return: Dict[str, Dict[str, Any]] = {}
def load_known_content(directory: str='known_content') -> dict[str, dict[str, Any]]:
to_return: dict[str, dict[str, Any]] = {}
for known_content_file in (get_homedir() / directory).glob('*.json'):
with known_content_file.open() as f:
to_return[known_content_file.stem] = json.load(f)
return to_return


def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes, List[Dict[str, Union[str, bool]]]]]=None) -> List[Dict[str, Union[str, bool]]]:
cookies: List[Dict[str, Union[str, bool]]]
def load_cookies(cookie_pseudofile: BufferedIOBase | str | bytes | list[dict[str, str | bool]] | None=None) -> list[dict[str, str | bool]]:
cookies: list[dict[str, str | bool]]
if cookie_pseudofile:
if isinstance(cookie_pseudofile, (str, bytes)):
try:
Expand All @@ -229,10 +232,10 @@ def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes, L

with (get_homedir() / 'cookies.json').open() as f:
cookies = json.load(f)
to_return: List[Dict[str, Union[str, bool]]] = []
to_return: list[dict[str, str | bool]] = []
try:
for cookie in cookies:
to_add: Dict[str, Union[str, bool]]
to_add: dict[str, str | bool]
if 'Host raw' in cookie and isinstance(cookie['Host raw'], str):
# Cookie export format for Cookie Quick Manager
u = urlparse(cookie['Host raw']).netloc.split(':', 1)[0]
Expand All @@ -253,7 +256,7 @@ def load_cookies(cookie_pseudofile: Optional[Union[BufferedIOBase, str, bytes, L
return to_return


def uniq_domains(uniq_urls: List[str]) -> Set[str]:
def uniq_domains(uniq_urls: list[str]) -> set[str]:
domains = set()
for url in uniq_urls:
splitted = urlparse(url)
Expand All @@ -267,7 +270,7 @@ def get_useragent_for_requests() -> str:
return f'Lookyloo / {version("lookyloo")}'


def get_cache_directory(root: Path, identifier: str, namespace: Optional[Union[str, Path]] = None) -> Path:
def get_cache_directory(root: Path, identifier: str, namespace: str | Path | None = None) -> Path:
m = hashlib.md5()
m.update(identifier.encode())
digest = m.hexdigest()
Expand Down Expand Up @@ -331,26 +334,26 @@ class ParsedUserAgent(UserAgent):
# from https://python.tutorialink.com/how-do-i-get-the-user-agent-with-flask/

@cached_property
def _details(self) -> Dict[str, Any]:
def _details(self) -> dict[str, Any]:
return user_agent_parser.Parse(self.string)

@property
def platform(self) -> Optional[str]: # type: ignore[override]
def platform(self) -> str | None: # type: ignore[override]
return self._details['os'].get('family')

@property
def platform_version(self) -> Optional[str]:
def platform_version(self) -> str | None:
return self._aggregate_version(self._details['os'])

@property
def browser(self) -> Optional[str]: # type: ignore[override]
def browser(self) -> str | None: # type: ignore[override]
return self._details['user_agent'].get('family')

@property
def version(self) -> Optional[str]: # type: ignore[override]
def version(self) -> str | None: # type: ignore[override]
return self._aggregate_version(self._details['user_agent'])

def _aggregate_version(self, details: Dict[str, str]) -> Optional[str]:
def _aggregate_version(self, details: dict[str, str]) -> str | None:
return '.'.join(
part
for key in ('major', 'minor', 'patch', 'patch_minor')
Expand Down
2 changes: 1 addition & 1 deletion lookyloo/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
# import re
from collections import defaultdict
from typing import Dict, Iterable, List, Optional, Set, Tuple
from typing import Iterable
from urllib.parse import urlsplit

from har2tree import CrawledTree # type: ignore[attr-defined]
Expand Down
8 changes: 4 additions & 4 deletions lookyloo/lookyloo.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
from functools import cached_property
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, TYPE_CHECKING, overload, Literal
from typing import Any, Iterable, TYPE_CHECKING, overload, Literal
from urllib.parse import urlparse
from uuid import uuid4
from zipfile import ZipFile

from defang import defang # type: ignore
from defang import defang # type: ignore[import-untyped]
from har2tree import CrawledTree, HostNode, URLNode # type: ignore[attr-defined]
from lacuscore import (LacusCore,
CaptureStatus as CaptureStatusCore,
Expand Down Expand Up @@ -997,8 +997,8 @@ def get_url_occurrences(self, url: str, /, limit: int=20, cached_captures_only:
for capture in captures[:limit]:
ct = self.get_crawled_tree(capture.uuid)
to_append: dict[str, str | dict[str, Any]] = {'capture_uuid': capture.uuid,
'start_timestamp': capture.timestamp.isoformat(),
'title': capture.title}
'start_timestamp': capture.timestamp.isoformat(),
'title': capture.title}
urlnodes: dict[str, dict[str, str]] = {}
for urlnode in ct.root_hartree.url_tree.search_nodes(name=url):
urlnodes[urlnode.uuid] = {'start_time': urlnode.start_time.isoformat(),
Expand Down
10 changes: 6 additions & 4 deletions lookyloo/modules/abstractmodule.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#!/usr/bin/env python3

from __future__ import annotations

import logging

from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
from typing import Any

from ..default import get_config

Expand All @@ -13,11 +15,11 @@
class AbstractModule(ABC):
'''Just a simple abstract for the modules to catch issues with initialization'''

def __init__(self, /, *, config_name: Optional[str]=None,
config: Optional[Dict[str, Any]]=None):
def __init__(self, /, *, config_name: str | None=None,
config: dict[str, Any] | None=None):
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.logger.setLevel(get_config('generic', 'loglevel'))
self.config: Dict[str, Any] = {}
self.config: dict[str, Any] = {}
self._available = False
if config_name:
try:
Expand Down
2 changes: 1 addition & 1 deletion lookyloo/modules/circlpdns.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import json

from datetime import date
from typing import Dict, List, Optional, TYPE_CHECKING
from typing import TYPE_CHECKING
from urllib.parse import urlparse

from pypdns import PyPDNS, PDNSRecord # type: ignore[attr-defined]
Expand Down
Loading

0 comments on commit 8f59858

Please sign in to comment.