Skip to content

Commit

Permalink
new: Indexer for *all* the captures
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed Mar 5, 2024
1 parent 2bbd35c commit e45b7c4
Show file tree
Hide file tree
Showing 14 changed files with 1,450 additions and 568 deletions.
177 changes: 33 additions & 144 deletions bin/background_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,141 +5,50 @@
import logging
import logging.config
import os
import shutil

from datetime import datetime, timedelta
from pathlib import Path

from lookyloo import Lookyloo
from lookyloo import Lookyloo, Indexing
from lookyloo.default import AbstractManager, get_config
from lookyloo.exceptions import MissingUUID, NoValidHarFile
from lookyloo.helpers import is_locked, get_sorted_captures_from_disk, make_dirs_list
from lookyloo.exceptions import NoValidHarFile


logging.config.dictConfig(get_config('logging'))


class BackgroundIndexer(AbstractManager):

def __init__(self, loglevel: int | None=None):
def __init__(self, full: bool=False, loglevel: int | None=None):
super().__init__(loglevel)
self.lookyloo = Lookyloo()
self.script_name = 'background_indexer'
self.full_indexer = full
self.indexing = Indexing(full_index=self.full_indexer)
if self.full_indexer:
self.script_name = 'background_full_indexer'
else:
self.script_name = 'background_indexer'
# make sure discarded captures dir exists
self.discarded_captures_dir = self.lookyloo.capture_dir.parent / 'discarded_captures'
self.discarded_captures_dir.mkdir(parents=True, exist_ok=True)

def _to_run_forever(self) -> None:
all_done = self._build_missing_pickles()
if all_done:
self._check_indexes()
# Disable probabilistic indexing for now, mmh3 isn't a fuzzy hash ago.
# self._check_probabilistic_indexes()
self._check_indexes()
self.lookyloo.update_tree_cache_info(os.getpid(), self.script_name)

def _build_missing_pickles(self) -> bool:
self.logger.debug('Build missing pickles...')
# Sometimes, we have a huge backlog and the process might get stuck on old captures for a very long time
# This value makes sure we break out of the loop and build pickles of the most recent captures
max_captures = 50
got_new_captures = False

# Initialize time where we do not want to build the pickles anymore.
archive_interval = timedelta(days=get_config('generic', 'archive'))
cut_time = (datetime.now() - archive_interval)
for month_dir in make_dirs_list(self.lookyloo.capture_dir):
__counter_shutdown = 0
for capture_time, path in sorted(get_sorted_captures_from_disk(month_dir, cut_time=cut_time, keep_more_recent=True), reverse=True):
__counter_shutdown += 1
if __counter_shutdown % 10 and self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
return False
if ((path / 'tree.pickle.gz').exists() or (path / 'tree.pickle').exists()):
# We already have a pickle file
self.logger.debug(f'{path} has a pickle.')
continue
if not list(path.rglob('*.har.gz')) and not list(path.rglob('*.har')):
# No HAR file
self.logger.debug(f'{path} has no HAR file.')
continue

if is_locked(path):
# it is really locked
self.logger.debug(f'{path} is locked, pickle generated by another process.')
continue

with (path / 'uuid').open() as f:
uuid = f.read()

if not self.lookyloo.redis.hexists('lookup_dirs', uuid):
# The capture with this UUID exists, but it is for some reason missing in lookup_dirs
self.lookyloo.redis.hset('lookup_dirs', uuid, str(path))
else:
cached_path = Path(self.lookyloo.redis.hget('lookup_dirs', uuid)) # type: ignore[arg-type]
if cached_path != path:
# we have a duplicate UUID, it is proably related to some bad copy/paste
if cached_path.exists():
# Both paths exist, move the one that isn't in lookup_dirs
self.logger.critical(f'Duplicate UUID for {uuid} in {cached_path} and {path}, discarding the latest')
try:
shutil.move(str(path), str(self.discarded_captures_dir / path.name))
except FileNotFoundError as e:
self.logger.warning(f'Unable to move capture: {e}')
continue
else:
# The path in lookup_dirs for that UUID doesn't exists, just update it.
self.lookyloo.redis.hset('lookup_dirs', uuid, str(path))

try:
self.logger.info(f'Build pickle for {uuid}: {path.name}')
self.lookyloo.get_crawled_tree(uuid)
self.lookyloo.trigger_modules(uuid, auto_trigger=True)
self.logger.info(f'Pickle for {uuid} built.')
got_new_captures = True
max_captures -= 1
except MissingUUID:
self.logger.warning(f'Unable to find {uuid}. That should not happen.')
except NoValidHarFile as e:
self.logger.critical(f'There are no HAR files in the capture {uuid}: {path.name} - {e}')
except FileNotFoundError:
self.logger.warning(f'Capture {uuid} disappeared during processing, probably archived.')
except Exception:
self.logger.exception(f'Unable to build pickle for {uuid}: {path.name}')
# The capture is not working, moving it away.
try:
shutil.move(str(path), str(self.discarded_captures_dir / path.name))
self.lookyloo.redis.hdel('lookup_dirs', uuid)
except FileNotFoundError as e:
self.logger.warning(f'Unable to move capture: {e}')
continue
if max_captures <= 0:
self.logger.info('Too many captures in the backlog, start from the beginning.')
return False
if got_new_captures:
self.logger.info('Finished building all missing pickles.')
# Only return True if we built new pickles.
return True
return False

def _check_indexes(self) -> None:
index_redis = self.lookyloo.indexing.redis
can_index = index_redis.set('ongoing_indexing', 1, ex=3600, nx=True)
if not can_index:
if not self.indexing.can_index:
# There is no reason to run this method in multiple scripts.
self.logger.info('Indexing already ongoing in another process.')
return None
self.logger.info('Check indexes...')
self.logger.info(f'Check {self.script_name}...')
for cache in self.lookyloo.sorted_capture_cache(cached_captures_only=False):
if self.lookyloo.is_public_instance and cache.no_index:
# Capture unindexed
if not self.full_indexer:
# If we're not running the full indexer, check if the capture should be indexed.
if self.lookyloo.is_public_instance and cache.no_index:
# Capture unindexed
continue
if not cache.tree_ready:
# pickle isn't ready, we can't index.
continue
p = index_redis.pipeline()
p.sismember('indexed_urls', cache.uuid)
p.sismember('indexed_body_hashes', cache.uuid)
p.sismember('indexed_cookies', cache.uuid)
p.sismember('indexed_hhhashes', cache.uuid)
p.sismember('indexed_favicons', cache.uuid)
indexed = p.execute()
indexed = self.indexing.capture_indexed(cache.uuid)
if all(indexed):
continue
try:
Expand All @@ -151,50 +60,23 @@ def _check_indexes(self) -> None:

if not indexed[0]:
self.logger.info(f'Indexing urls for {cache.uuid}')
self.lookyloo.indexing.index_url_capture(ct)
self.indexing.index_url_capture(ct)
if not indexed[1]:
self.logger.info(f'Indexing resources for {cache.uuid}')
self.lookyloo.indexing.index_body_hashes_capture(ct)
self.indexing.index_body_hashes_capture(ct)
if not indexed[2]:
self.logger.info(f'Indexing cookies for {cache.uuid}')
self.lookyloo.indexing.index_cookies_capture(ct)
self.indexing.index_cookies_capture(ct)
if not indexed[3]:
self.logger.info(f'Indexing HH Hashes for {cache.uuid}')
self.lookyloo.indexing.index_http_headers_hashes_capture(ct)
self.indexing.index_http_headers_hashes_capture(ct)
if not indexed[4]:
self.logger.info(f'Indexing favicons for {cache.uuid}')
favicons = self.lookyloo.get_potential_favicons(cache.uuid, all_favicons=True, for_datauri=False)
self.lookyloo.indexing.index_favicons_capture(cache.uuid, favicons)
self.indexing.index_favicons_capture(cache.uuid, favicons)
# NOTE: categories aren't taken in account here, should be fixed(?)
# see indexing.index_categories_capture(capture_uuid, categories)
index_redis.delete('ongoing_indexing')
self.logger.info('... done.')

def _check_probabilistic_indexes(self) -> None:
index_redis = self.lookyloo.indexing.redis
can_index = index_redis.set('ongoing_probalistic_indexing', 1, ex=3600, nx=True)
if not can_index:
# There is no reason to run this method in multiple scripts.
self.logger.info('Probalistic indexing already ongoing in another process.')
return None
self.logger.info('Check probabilistic indexes...')
algorithms = ['mmh3-shodan']
for cache in self.lookyloo.sorted_capture_cache(cached_captures_only=False):
if self.lookyloo.is_public_instance and cache.no_index:
# Capture unindexed
continue
p = index_redis.pipeline()
for algorithm in algorithms:
p.sismember(f'indexed_favicons_probabilistic|{algorithm}', cache.uuid)
indexed = p.execute()
if all(indexed):
continue
for i, algorithm in enumerate(algorithms):
if not indexed[i]:
self.logger.info(f'Probabilistic indexing favicons for {cache.uuid} with {algorithm}')
favicons = self.lookyloo.get_potential_favicons(cache.uuid, all_favicons=True, for_datauri=False)
self.lookyloo.indexing.index_favicons_probabilistic(cache.uuid, favicons, algorithm)
index_redis.delete('ongoing_probalistic_indexing')
self.indexing.indexing_done()
self.logger.info('... done.')


Expand All @@ -203,5 +85,12 @@ def main() -> None:
i.run(sleep_in_sec=60)


def main_full_indexer() -> None:
if not get_config('generic', 'index_everything'):
raise Exception('Full indexer is disabled.')
i = BackgroundIndexer(full=True)
i.run(sleep_in_sec=60)


if __name__ == '__main__':
main()
23 changes: 22 additions & 1 deletion bin/run_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from redis import Redis
from redis.exceptions import ConnectionError

from lookyloo.default import get_homedir, get_socket_path
from lookyloo.default import get_homedir, get_socket_path, get_config


def check_running(name: str) -> bool:
Expand Down Expand Up @@ -55,13 +55,32 @@ def shutdown_indexing(storage_directory: Path | None=None) -> None:
print('Redis indexing database shutdown.')


def launch_full_index(storage_directory: Path | None=None) -> None:
if not storage_directory:
storage_directory = get_homedir()
if not check_running('full_index'):
Popen(["./run_kvrocks.sh"], cwd=(storage_directory / 'full_index'))


def shutdown_full_index(storage_directory: Path | None=None) -> None:
if not storage_directory:
storage_directory = get_homedir()
r = Redis(unix_socket_path=get_socket_path('full_index'))
r.shutdown(save=True)
print('Kvrocks full indexing database shutdown.')


def launch_all() -> None:
launch_cache()
launch_indexing()
if get_config('generic', 'index_everything'):
launch_full_index()


def check_all(stop: bool=False) -> None:
backends: dict[str, bool] = {'cache': False, 'indexing': False}
if get_config('generic', 'index_everything'):
backends['full_index'] = False
while True:
for db_name in backends.keys():
try:
Expand All @@ -85,6 +104,8 @@ def check_all(stop: bool=False) -> None:
def stop_all() -> None:
shutdown_cache()
shutdown_indexing()
if get_config('generic', 'index_everything'):
shutdown_full_index()


def main() -> None:
Expand Down
9 changes: 8 additions & 1 deletion bin/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from subprocess import Popen, run

from lookyloo.default import get_homedir
from lookyloo.default import get_homedir, get_config


def main() -> None:
Expand All @@ -18,9 +18,16 @@ def main() -> None:
print('Start asynchronous ingestor...')
Popen(['async_capture'])
print('done.')
print('Start background capture builder...')
Popen(['background_build_captures'])
print('done.')
print('Start background indexer...')
Popen(['background_indexer'])
print('done.')
if get_config('generic', 'index_everything'):
print('Start background full indexer...')
Popen(['background_full_indexer'])
print('done.')
print('Start background processing...')
Popen(['processing'])
print('done.')
Expand Down
4 changes: 3 additions & 1 deletion config/generic.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"bucket_name": ""
}
},
"index_everything": false,
"_notes": {
"loglevel": "(lookyloo) Can be one of the value listed here: https://docs.python.org/3/library/logging.html#levels",
"only_global_lookups": "Set it to True if your instance is publicly available so users aren't able to scan your internal network",
Expand Down Expand Up @@ -110,6 +111,7 @@
"archive": "The captures older than this value (in days) will be archived. They're not cached by default in the Lookyloo class.",
"max_capture_time": "The very maximal time we allow a capture to keep going. Should only be triggered by captures that cause playwright to never quit.",
"max_tree_create_time": "The max time the generation of a tree is allowed to take",
"s3fs": "The config to access a S3FS instance with the s3fs python module - it is not integrated properly for now as it requires urllib < 2.0 which is a non-started at this stage."
"s3fs": "The config to access a S3FS instance with the s3fs python module - it is not integrated properly for now as it requires urllib < 2.0 which is a non-started at this stage.",
"index_everything": "If true, index every capture, even if it's not public. This feature requires a dedicated kvrocks instance, and is only accessible when logged-in as admin."
}
}
Loading

0 comments on commit e45b7c4

Please sign in to comment.