diff --git a/bin/background_build_captures.py b/bin/background_build_captures.py index 3915e9fa..ba36b3bf 100755 --- a/bin/background_build_captures.py +++ b/bin/background_build_captures.py @@ -45,7 +45,7 @@ def __auto_report(self, path: Path) -> None: # could be an empty file. settings = json.loads(ar) try: - self.lookyloo.send_mail(capture_uuid, email=settings.get('email', ''), + self.lookyloo.send_mail(capture_uuid, as_admin=True, email=settings.get('email', ''), comment=settings.get('comment')) (path / 'auto_report').unlink() except Exception as e: diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index 098a7c00..da0abc70 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -882,7 +882,7 @@ def modules_filtered(self, capture_uuid: str, /) -> str | None: return f"Malicious capture according to {len(modules)} module(s): {', '.join(modules)}" - def send_mail(self, capture_uuid: str, /, email: str | None=None, comment: str | None=None) -> bool | dict[str, Any]: + def send_mail(self, capture_uuid: str, /, as_admin: bool, email: str | None=None, comment: str | None=None) -> bool | dict[str, Any]: '''Send an email notification regarding a specific capture''' if not get_config('generic', 'enable_mail_notification'): return {"error": "Unable to send mail: mail notification disabled"} @@ -916,7 +916,9 @@ def send_mail(self, capture_uuid: str, /, email: str | None=None, comment: str | self.logger.info('There are no MISP instances available for a lookup.') else: for instance_name in self.misps.keys(): - if occurrences := self.get_misp_occurrences(capture_uuid, instance_name=instance_name): + if occurrences := self.get_misp_occurrences(capture_uuid, + as_admin=as_admin, + instance_name=instance_name): elements, misp_url = occurrences for event_id, attributes in elements.items(): for value, ts in attributes: @@ -1225,7 +1227,8 @@ def misp_export(self, capture_uuid: str, /, with_parent: bool=False, *, as_admin return [event] - def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: str | None=None) -> tuple[dict[int, set[tuple[str, datetime]]], str] | None: + def get_misp_occurrences(self, capture_uuid: str, /, as_admin: bool, + *, instance_name: str | None=None) -> tuple[dict[int, set[tuple[str, datetime]]], str] | None: if instance_name is None: misp = self.misps.default_misp elif self.misps.get(instance_name) is not None: @@ -1244,7 +1247,7 @@ def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: str | Non nodes_to_lookup = ct.root_hartree.rendered_node.get_ancestors() + [ct.root_hartree.rendered_node] to_return: dict[int, set[tuple[str, datetime]]] = defaultdict(set) for node in nodes_to_lookup: - hits = misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid)) + hits = misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid), as_admin=as_admin) for event_id, values in hits.items(): if not isinstance(event_id, int) or not isinstance(values, set): continue diff --git a/lookyloo/modules/misp.py b/lookyloo/modules/misp.py index d80b35e3..f6fe22c8 100644 --- a/lookyloo/modules/misp.py +++ b/lookyloo/modules/misp.py @@ -57,6 +57,20 @@ def module_init(self) -> bool: return True + @property + def has_public_misp(self) -> bool: + return not all(misp.admin_only for misp in self.__misps.values()) + + def has_lookup(self, as_admin: bool) -> bool: + if as_admin: + return any(misp.enable_lookup for misp in self.__misps.values()) + return any(misp.enable_lookup and not misp.admin_only for misp in self.__misps.values()) + + def has_push(self, as_admin: bool) -> bool: + if as_admin: + return any(misp.enable_push for misp in self.__misps.values()) + return any(misp.enable_push and not misp.admin_only for misp in self.__misps.values()) + def __getitem__(self, name: str) -> MISP: return self.__misps[name] @@ -200,7 +214,7 @@ def _prepare_push(self, to_push: list[MISPEvent] | MISPEvent, allow_duplicates: existing_uuid_to_extend = None for event in events: if not allow_duplicates: - existing_event = self.get_existing_event(event.attributes[0].value) + existing_event = self.__get_existing_event(event.attributes[0].value) if existing_event: existing_uuid_to_extend = existing_event.uuid continue @@ -215,38 +229,44 @@ def _prepare_push(self, to_push: list[MISPEvent] | MISPEvent, allow_duplicates: events_to_push.append(event) return events_to_push - def push(self, to_push: list[MISPEvent] | MISPEvent, allow_duplicates: bool=False, auto_publish: bool | None=None) -> list[MISPEvent] | dict[Any, Any]: + def push(self, to_push: list[MISPEvent] | MISPEvent, as_admin: bool, *, allow_duplicates: bool=False, + auto_publish: bool | None=None) -> list[MISPEvent] | dict[Any, Any]: + if not self.available: + return {'error': 'Module not available.'} + if not self.enable_push: + return {'error': 'Push not enabled.'} + if self.admin_only and not as_admin: + return {'error': 'Admin only module, cannot push.'} + if auto_publish is None: auto_publish = self.auto_publish - if self.available and self.enable_push: - events = self._prepare_push(to_push, allow_duplicates, auto_publish) - if not events: - return {'error': 'All the events are already on the MISP instance.'} - if isinstance(events, dict): - return {'error': events} - to_return = [] - for event in events: - try: - # NOTE: POST the event as published publishes inline, which can tak a long time. - # Here, we POST as not published, and trigger the publishing in a second call. - if hasattr(event, 'published'): - background_publish = event.published - else: - background_publish = False - if background_publish: - event.published = False - new_event = self.client.add_event(event, pythonify=True) - if background_publish and isinstance(new_event, MISPEvent): - self.client.publish(new_event) - except requests.exceptions.ReadTimeout: - return {'error': 'The connection to MISP timed out, try increasing the timeout in the config.'} - if isinstance(new_event, MISPEvent): - to_return.append(new_event) + + events = self._prepare_push(to_push, allow_duplicates, auto_publish) + if not events: + return {'error': 'All the events are already on the MISP instance.'} + if isinstance(events, dict): + return {'error': events} + to_return = [] + for event in events: + try: + # NOTE: POST the event as published publishes inline, which can tak a long time. + # Here, we POST as not published, and trigger the publishing in a second call. + if hasattr(event, 'published'): + background_publish = event.published else: - return {'error': new_event} - return to_return - else: - return {'error': 'Module not available or push not enabled.'} + background_publish = False + if background_publish: + event.published = False + new_event = self.client.add_event(event, pythonify=True) + if background_publish and isinstance(new_event, MISPEvent): + self.client.publish(new_event) + except requests.exceptions.ReadTimeout: + return {'error': 'The connection to MISP timed out, try increasing the timeout in the config.'} + if isinstance(new_event, MISPEvent): + to_return.append(new_event) + else: + return {'error': new_event} + return to_return def get_existing_event_url(self, permaurl: str) -> str | None: attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True) @@ -255,7 +275,7 @@ def get_existing_event_url(self, permaurl: str) -> str | None: url = f'{self.client.root_url}/events/{attributes[0].event_id}' return url - def get_existing_event(self, permaurl: str) -> MISPEvent | None: + def __get_existing_event(self, permaurl: str) -> MISPEvent | None: attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True) if not attributes or not isinstance(attributes, list) or not isinstance(attributes[0], MISPAttribute): return None @@ -264,36 +284,40 @@ def get_existing_event(self, permaurl: str) -> MISPEvent | None: return event return None - def lookup(self, node: URLNode, hostnode: HostNode) -> dict[int | str, str | set[tuple[str, datetime]]]: - if self.available and self.enable_lookup: - tld = self.psl.publicsuffix(hostnode.name) - domain = re.sub(f'.{tld}$', '', hostnode.name).split('.')[-1] - to_lookup = [node.name, hostnode.name, f'{domain}.{tld}'] - if hasattr(hostnode, 'resolved_ips'): - if 'v4' in hostnode.resolved_ips: - to_lookup += hostnode.resolved_ips['v4'] - if 'v6' in hostnode.resolved_ips: - to_lookup += hostnode.resolved_ips['v6'] - if hasattr(hostnode, 'cnames'): - to_lookup += hostnode.cnames - if not node.empty_response: - to_lookup.append(node.body_hash) - if attributes := self.client.search(controller='attributes', value=to_lookup, - enforce_warninglist=True, pythonify=True): - if isinstance(attributes, list): - to_return: dict[int, set[tuple[str, datetime]]] = defaultdict(set) - a: MISPAttribute - for a in attributes: # type: ignore[assignment] - if isinstance(a.value, str): - # a.timestamp is always a datetime in this situation - to_return[a.event_id].add((a.value, a.timestamp)) # type: ignore[arg-type] - else: - # This shouldn't happen (?) - self.logger.warning(f'Unexpected value type in MISP lookup: {type(a.value)}') - return to_return # type: ignore[return-value] - else: - # The request returned an error - return attributes # type: ignore[return-value] - return {'info': 'No hits.'} - else: - return {'error': 'Module not available or lookup not enabled.'} + def lookup(self, node: URLNode, hostnode: HostNode, as_admin: bool) -> dict[int | str, str | set[tuple[str, datetime]]]: + if not self.available: + return {'error': 'Module not available.'} + if not self.enable_lookup: + return {'error': 'Lookup not enabled.'} + if self.admin_only and not as_admin: + return {'error': 'Admin only module, cannot lookup.'} + + tld = self.psl.publicsuffix(hostnode.name) + domain = re.sub(f'.{tld}$', '', hostnode.name).split('.')[-1] + to_lookup = [node.name, hostnode.name, f'{domain}.{tld}'] + if hasattr(hostnode, 'resolved_ips'): + if 'v4' in hostnode.resolved_ips: + to_lookup += hostnode.resolved_ips['v4'] + if 'v6' in hostnode.resolved_ips: + to_lookup += hostnode.resolved_ips['v6'] + if hasattr(hostnode, 'cnames'): + to_lookup += hostnode.cnames + if not node.empty_response: + to_lookup.append(node.body_hash) + if attributes := self.client.search(controller='attributes', value=to_lookup, + enforce_warninglist=True, pythonify=True): + if isinstance(attributes, list): + to_return: dict[int, set[tuple[str, datetime]]] = defaultdict(set) + a: MISPAttribute + for a in attributes: # type: ignore[assignment] + if isinstance(a.value, str): + # a.timestamp is always a datetime in this situation + to_return[a.event_id].add((a.value, a.timestamp)) # type: ignore[arg-type] + else: + # This shouldn't happen (?) + self.logger.warning(f'Unexpected value type in MISP lookup: {type(a.value)}') + return to_return # type: ignore[return-value] + else: + # The request returned an error + return attributes # type: ignore[return-value] + return {'info': 'No hits.'} diff --git a/website/web/__init__.py b/website/web/__init__.py index 75d0061a..d1243ba5 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -725,27 +725,47 @@ def stats(tree_uuid: str) -> str: @app.route('/tree//misp_lookup', methods=['GET']) -@flask_login.login_required # type: ignore[misc] def web_misp_lookup_view(tree_uuid: str) -> str | WerkzeugResponse | Response: if not lookyloo.misps.available: flash('There are no MISP instances available.', 'error') return redirect(url_for('tree', tree_uuid=tree_uuid)) + as_admin = flask_login.current_user.is_authenticated + if not as_admin and not lookyloo.misps.has_public_misp: + flash('You need to be authenticated to search on MISP.', 'error') + return redirect(url_for('tree', tree_uuid=tree_uuid)) + + if not as_admin and lookyloo.misps.default_misp.admin_only: + current_misp = None + else: + current_misp = lookyloo.misps.default_instance + misps_occurrences = {} - for instance_name in lookyloo.misps.keys(): - if occurrences := lookyloo.get_misp_occurrences(tree_uuid, instance_name=instance_name): + for instance_name, instance in lookyloo.misps.items(): + if instance.admin_only and not as_admin: + continue + if not current_misp: + # Pick the first one we can + current_misp = instance_name + if occurrences := lookyloo.get_misp_occurrences(tree_uuid, + as_admin=as_admin, + instance_name=instance_name): misps_occurrences[instance_name] = occurrences return render_template('misp_lookup.html', uuid=tree_uuid, - current_misp=lookyloo.misps.default_instance, + current_misp=current_misp, misps_occurrences=misps_occurrences) @app.route('/tree//misp_push', methods=['GET', 'POST']) -@flask_login.login_required # type: ignore[misc] -def web_misp_push_view(tree_uuid: str) -> str | WerkzeugResponse | Response | None: +def web_misp_push_view(tree_uuid: str) -> str | WerkzeugResponse | Response: if not lookyloo.misps.available: flash('There are no MISP instances available.', 'error') return redirect(url_for('tree', tree_uuid=tree_uuid)) + as_admin = flask_login.current_user.is_authenticated + if not as_admin and not lookyloo.misps.has_public_misp: + flash('You need to be authenticated to push to MISP.', 'error') + return redirect(url_for('tree', tree_uuid=tree_uuid)) + event = lookyloo.misp_export(tree_uuid) if isinstance(event, dict): flash(f'Unable to generate the MISP export: {event}', 'error') @@ -754,7 +774,18 @@ def web_misp_push_view(tree_uuid: str) -> str | WerkzeugResponse | Response | No if request.method == 'GET': # Initialize settings that will be displayed on the template misp_instances_settings = {} + if not as_admin and lookyloo.misps.default_misp.admin_only: + current_misp = None + else: + current_misp = lookyloo.misps.default_instance for name, instance in lookyloo.misps.items(): + if instance.admin_only and not as_admin: + continue + + if not current_misp: + # Pick the first one we can + current_misp = name + # the 1st attribute in the event is the link to lookyloo misp_instances_settings[name] = { 'default_tags': instance.default_tags, @@ -766,13 +797,13 @@ def web_misp_push_view(tree_uuid: str) -> str | WerkzeugResponse | Response | No cache = lookyloo.capture_cache(tree_uuid) return render_template('misp_push_view.html', - current_misp=lookyloo.misps.default_instance, + current_misp=current_misp, tree_uuid=tree_uuid, event=event[0], misp_instances_settings=misp_instances_settings, has_parent=True if cache and cache.parent else False) - elif request.method == 'POST': + else: # event is a MISPEvent at this point misp_instance_name = request.form.get('misp_instance_name') if not misp_instance_name or misp_instance_name not in lookyloo.misps: @@ -808,8 +839,10 @@ def web_misp_push_view(tree_uuid: str) -> str | WerkzeugResponse | Response | No events[-1].info = request.form.get('event_info') try: - new_events = misp.push(events, True if request.form.get('force_push') else False, - True if request.form.get('auto_publish') else False) + new_events = misp.push(events, as_admin=as_admin, + allow_duplicates=True if request.form.get('force_push') else False, + auto_publish=True if request.form.get('auto_publish') else False, + ) except MISPServerError: flash(f'MISP returned an error, the event(s) might still have been created on {misp.client.root_url}', 'error') else: @@ -819,7 +852,6 @@ def web_misp_push_view(tree_uuid: str) -> str | WerkzeugResponse | Response | No for e in new_events: flash(f'MISP event {e.id} created on {misp.client.root_url}', 'success') return redirect(url_for('tree', tree_uuid=tree_uuid)) - return None @app.route('/tree//modules', methods=['GET']) @@ -1130,7 +1162,7 @@ def send_mail(tree_uuid: str) -> WerkzeugResponse: # skip clearly incorrect emails email = '' comment: str = request.form['comment'] if request.form.get('comment') else '' - lookyloo.send_mail(tree_uuid, email, comment) + lookyloo.send_mail(tree_uuid, as_admin=flask_login.current_user.is_authenticated, email=email, comment=comment) flash("Email notification sent", 'success') return redirect(url_for('tree', tree_uuid=tree_uuid)) @@ -1219,8 +1251,8 @@ def tree(tree_uuid: str, node_uuid: str | None=None) -> Response | str | Werkzeu enable_context_by_users=enable_context_by_users, enable_categorization=enable_categorization, enable_bookmark=enable_bookmark, - misp_push=lookyloo.misps.available and lookyloo.misps.default_misp.enable_push, - misp_lookup=lookyloo.misps.available and lookyloo.misps.default_misp.enable_lookup, + misp_push=lookyloo.misps.available and lookyloo.misps.has_push(flask_login.current_user.is_authenticated), + misp_lookup=lookyloo.misps.available and lookyloo.misps.has_lookup(flask_login.current_user.is_authenticated), blur_screenshot=blur_screenshot, urlnode_uuid=hostnode_to_highlight, auto_trigger_modules=auto_trigger_modules, confirm_message=confirm_message if confirm_message else 'Tick to confirm.', diff --git a/website/web/templates/misp_lookup.html b/website/web/templates/misp_lookup.html index 5d89141a..7b84374a 100644 --- a/website/web/templates/misp_lookup.html +++ b/website/web/templates/misp_lookup.html @@ -16,6 +16,7 @@
Skips the entries in warnings lists enabled on your MISP instance.
+{% if misps_occurrences %} {% for name, occurrences in misps_occurrences.items() %}
{% set hits, root_url = occurrences %} @@ -36,4 +37,7 @@
Skips the entries in warnings lists enabled on your MISP instance.
{% endif %}
{% endfor %} +{%else%} +No hits in any of the instances available. +{%endif%}
diff --git a/website/web/templates/misp_push_view.html b/website/web/templates/misp_push_view.html index 21623d47..f082b32f 100644 --- a/website/web/templates/misp_push_view.html +++ b/website/web/templates/misp_push_view.html @@ -12,6 +12,7 @@ {%endif%}
+{%if misp_instances_settings %} {%for name, misp_settings in misp_instances_settings.items() %}
{%endfor%} +{%else%} +None of the instances are available, please login. +{%endif%}
diff --git a/website/web/templates/tree.html b/website/web/templates/tree.html index e81ec78f..d84a3732 100644 --- a/website/web/templates/tree.html +++ b/website/web/templates/tree.html @@ -211,6 +211,10 @@ Index capture {% endif %} + {% if misp_lookup%} + Search events on MISP + {% endif %} Third Party Reports @@ -261,6 +265,10 @@ Download elements + {% if misp_push%} + Prepare push to MISP + {% endif %} @@ -271,14 +279,6 @@ Rebuild capture Hide capture Remove capture - {% if misp_push%} - Prepare push to MISP - {% endif %} - {% if misp_lookup%} - Search events on MISP - {% endif %} Logout