Skip to content

Commit

Permalink
Merge pull request #1 from Wason1797/make-brs-forwarders
Browse files Browse the repository at this point in the history
Refactor BR functionality and add SVM41 support
  • Loading branch information
Wason1797 authored Nov 13, 2024
2 parents b769efd + 638f13b commit c892eb0
Show file tree
Hide file tree
Showing 19 changed files with 2,269 additions and 2,313 deletions.
4,046 changes: 2,025 additions & 2,021 deletions dashboard/dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ COPY . .

RUN poetry install --only main

EXPOSE 5683/udp

ENTRYPOINT ["bash", "runner.sh"]
2 changes: 1 addition & 1 deletion server/app/config/env_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Settings(BaseSettings):
STATION_TYPE: StationType
VERSION: str
ENV: str = "DEV"
BORDER_ROUTER_ID: Optional[int] = None
SEVRER_INSTANCE_ID: int
REPLICATION: ReplicationType = ReplicationType.NONE
WRITE_TO_BACKUP: bool = True
MAIN_SERVER_URI: Optional[str] = None
Expand Down
50 changes: 20 additions & 30 deletions server/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
import aiocoap.resource as resource # type: ignore

from app.config.env_manager import get_settings
from app.controllers.border_router import BorderRouterController
from app.log import log
from app.managers.aiq_manager import AiqDataManager
from app.managers.br_manager import BorderRouterManager
from app.managers.station_manager import StationManager
from app.repositories.aiq_coap.client import CoapClient
from app.repositories.mysql.database import MysqlConnector
from app.repositories.postgres.database import PostgresqlConnector
from app.resources import AiqDataResource, AiqManagementSummaryResource, AiqManagementTruncateResource, IndexResource
from app.resources import StationDataStorageResource, IndexResource
from app.resources.station_data_forwarder import StationDataForwarderResource
from app.security.payload_validator import PayloadValidator
from app.telegram.bot import ManagementBot

Expand All @@ -41,40 +41,33 @@ async def main() -> None:
log.info("Initializing COAP resources")
binds = ("localhost", None) if EnvManager.is_dev() else None
main_coap_context = await aiocoap.Context.create_server_context(None, bind=binds)
main_coap_client = CoapClient.get_instance(EnvManager.MAIN_SERVER_URI, main_coap_context)

server = resource.Site()
server.add_resource(
["aiq-data"],
AiqDataResource(
EnvManager.is_main_server(),
aiq_data_resource = None

if EnvManager.is_main_server():
aiq_data_resource = StationDataStorageResource(
PostgresqlConnector.get_session,
MysqlConnector.get_session,
main_coap_client,
PayloadValidator,
EnvManager.BORDER_ROUTER_ID,
EnvManager.allow_messages_from_br(),
EnvManager.SEVRER_INSTANCE_ID,
EnvManager.allow_backups(),
),
)

server.add_resource(
["index"],
IndexResource(EnvManager.VERSION),
)

if not EnvManager.is_main_server(): # Enable management interface for border routers
log.info("Initializing BR Management interface")
assert EnvManager.BORDER_ROUTER_ID
server.add_resource(
["aiq-management", "summary"],
AiqManagementSummaryResource(EnvManager.BORDER_ROUTER_ID, PostgresqlConnector.get_session),
)
server.add_resource(
["aiq-management", "truncate"],
AiqManagementTruncateResource(EnvManager.BORDER_ROUTER_ID, PostgresqlConnector.get_session),

else:
assert EnvManager.MAIN_SERVER_URI, "Border Routers need MAIN_SERVER_URI to be valid"
main_coap_client = CoapClient.get_instance(EnvManager.MAIN_SERVER_URI, main_coap_context)
aiq_data_resource = StationDataForwarderResource(
MysqlConnector.get_session,
PayloadValidator,
EnvManager.SEVRER_INSTANCE_ID,
main_coap_client,
EnvManager.allow_backups(),
)

server.add_resource(["aiq-data"], aiq_data_resource)
server.add_resource(["index"], IndexResource(EnvManager.VERSION))

# Register bot commands to manage border routers and main server
if EnvManager.is_main_server():
log.info("Initializing Telegram BOT Commands")
Expand All @@ -91,9 +84,6 @@ async def main() -> None:
"register_br",
partial(BorderRouterManager.register_border_router, PostgresqlConnector.get_session, MysqlConnector.get_session),
)
ManagementBot.register_commad(
"truncate", partial(BorderRouterController.truncate_br_database, PostgresqlConnector.get_session, main_coap_context)
)

log.info("Starting AIQ Server")
asyncio.get_running_loop().add_signal_handler(signal.SIGINT, lambda: sys.exit(0))
Expand Down
13 changes: 10 additions & 3 deletions server/app/managers/aiq_manager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from datetime import datetime, timezone
from typing import Optional


from app.types import AsyncSessionMaker
from sqlalchemy import delete, func, select

from app.repositories.db.models import ENS160Data, SCD41Data, StationData
from app.repositories.db.models import ENS160Data, SCD41Data, StationData, SVM41Data
from app.serializers.request import AiqDataFromStation
from app.types import AsyncSessionMaker

SUMMARY_TEMPLATE = """
Row Count: {}
Expand Down Expand Up @@ -84,6 +83,14 @@ async def save_sensor_data(
if data.ens160_d:
sensor_data.ens160_data = ENS160Data(eco2=data.ens160_d.eco2, tvoc=data.ens160_d.tvoc, aqi=data.ens160_d.aqi)

if data.svm41_d:
sensor_data.svm41_data = SVM41Data(
temperature=data.svm41_d.temp.to_str_number(),
humidity=data.svm41_d.hum.to_str_number(),
nox_index=data.svm41_d.nox.to_str_number(),
voc_index=data.svm41_d.voc.to_str_number(),
)

async with session_maker() as session:
session.add(sensor_data)
await session.commit()
Expand Down
13 changes: 4 additions & 9 deletions server/app/repositories/aiq_coap/client.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
from aiocoap import Context, Message, PUT, GET, DELETE # type: ignore

from typing import Optional


class CoapClient:
class Response:
def __init__(self, code: int, payload: str) -> None:
self.code = code
self.payload = payload

@classmethod
def get_instance(cls, server_uri: Optional[str], context: Context) -> Optional["CoapClient"]:
if not server_uri:
return None

return cls(server_uri, context)

def __init__(self, server_uri: str, context: Context) -> None:
self.server_uri = server_uri
self.context = context

@classmethod
def get_instance(cls, server_uri: str, context: Context) -> "CoapClient":
return cls(server_uri, context)

async def put_payload(self, payload: str) -> Response:
request = Message(code=PUT, payload=payload.encode(encoding="ascii"), uri=self.server_uri)

Expand Down
13 changes: 5 additions & 8 deletions server/app/repositories/aiq_coap/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import traceback

from app.log import log
from app.telegram.bot import ManagementBot

from .client import CoapClient

Expand All @@ -11,23 +10,21 @@ class AiqDataCoapForwarder:
background_tasks: set = set()

@classmethod
def forward_aiq_data(cls, coap_client: CoapClient, data: str, border_router_id: int) -> None:
def forward_aiq_data(cls, coap_client: CoapClient, data: str, border_router_id: int) -> asyncio.Task:
async def forward_data_task() -> None:
try:
log.info("Forwarding")
response = await coap_client.put_payload(f"{data}|{border_router_id}")
await asyncio.sleep(0.01)
log.info(f"Forwarded {response.code}, {response.payload}")
except Exception:
trace = traceback.format_exc()
await ManagementBot.send_notification(
f"An error occurred while forwarding to {coap_client.server_uri}:\n {trace}"
)
except Exception as ex:
log.exception("Error while forwarding payload")
raise ex

task = asyncio.create_task(forward_data_task()) # Schedule the task in the background
cls.background_tasks.add(task)
task.add_done_callback(cls.background_tasks.discard)
return None
return task


class AiqBorderRouterCoapClient:
Expand Down
5 changes: 2 additions & 3 deletions server/app/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
from .aiq_data import AiqDataResource # noqa
from .aiq_management import AiqManagementSummaryResource, AiqManagementTruncateResource # noqa
from .index import IndexResource
from .station_data_storage import StationDataStorageResource # noqa
from .index import IndexResource # noqa
53 changes: 0 additions & 53 deletions server/app/resources/aiq_management.py

This file was deleted.

53 changes: 53 additions & 0 deletions server/app/resources/station_data_forwarder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import asyncio
from typing import Type, cast

import aiocoap.resource as resource # type: ignore
from aiocoap import CHANGED, INTERNAL_SERVER_ERROR, Message # type: ignore

from app.log import log
from app.managers.aiq_manager import AiqDataManager
from app.repositories.aiq_coap.client import CoapClient
from app.repositories.aiq_coap.managers import AiqDataCoapForwarder
from app.security.payload_validator import PayloadValidator
from app.serializers.request import AiqDataFromStation
from app.types import AsyncSessionMaker


class StationDataForwarderResource(resource.Resource):
def __init__(
self,
backup_session: AsyncSessionMaker,
payload_validator: Type[PayloadValidator],
server_instance_id: int,
coap_client: CoapClient,
allow_backups: bool = False,
):
super().__init__()
self.backup_session = backup_session
self.payload_validator = payload_validator
self.coap_client = coap_client
self.server_instance_id = server_instance_id
self.allow_backups = allow_backups

async def render_put(self, request) -> Message:
try:
payload: str = request.payload.decode("ascii")
log.info(f"[COAP] forwarder request {payload}")

validated_payload = self.payload_validator.validate(payload, AiqDataFromStation)

sensor_data = cast(AiqDataFromStation, validated_payload.data)
border_router_id = validated_payload.border_router_id or self.server_instance_id

AiqDataCoapForwarder.forward_aiq_data(self.coap_client, payload, border_router_id)

if self.allow_backups:
await AiqDataManager.save_sensor_data(self.backup_session, sensor_data, border_router_id)
else:
await asyncio.sleep(0.01) # Await something so the background task has time to schedule/run.

except Exception:
log.exception("Error in StationDataForwarderResource", exc_info=True)
return Message(code=INTERNAL_SERVER_ERROR, payload="Error forwarding data")

return Message(code=CHANGED, payload="")
Original file line number Diff line number Diff line change
Expand Up @@ -6,68 +6,56 @@

from app.log import log
from app.managers.aiq_manager import AiqDataManager
from app.repositories.aiq_coap.client import CoapClient
from app.repositories.aiq_coap.managers import AiqDataCoapForwarder
from app.security.payload_validator import PayloadValidator
from app.serializers.request import AiqDataFromStation
from app.telegram.bot import ManagementBot
from app.types import AsyncSessionMaker


class AiqDataResource(resource.Resource):
class StationDataStorageResource(resource.Resource):
def __init__(
self,
is_main_server: bool,
main_session: AsyncSessionMaker,
backup_session: AsyncSessionMaker,
coap_client: Optional[CoapClient],
payload_validator: Type[PayloadValidator],
border_router_id: Optional[int] = None,
allow_messages_from_br: bool = False,
server_instance_id: Optional[int] = None,
allow_backups: bool = False,
):
super().__init__()
self.is_main_server = is_main_server
self.border_router_id = border_router_id
self.main_session = main_session
self.backup_session = backup_session
self.coap_client = coap_client
self.server_instance_id = server_instance_id
self.payload_validator = payload_validator
self.allow_messages_from_br = allow_messages_from_br
self.allow_backups = allow_backups

async def render_put(self, request) -> Message:
try:
payload: str = request.payload.decode("ascii")
log.info(f"[COAP] got request {payload}")
log.info(f"[COAP] storage request {payload}")

validated_payload = self.payload_validator.validate(
payload, AiqDataFromStation, self.is_main_server, self.allow_messages_from_br
)
validated_payload = self.payload_validator.validate(payload, AiqDataFromStation)

sensor_data = cast(AiqDataFromStation, validated_payload.data)
border_router_id = validated_payload.border_router_id or self.server_instance_id

if not self.is_main_server and self.coap_client:
AiqDataCoapForwarder.forward_aiq_data(self.coap_client, payload, self.border_router_id or 0)
# if not self.is_main_server and self.coap_client:
# AiqDataCoapForwarder.forward_aiq_data(self.coap_client, payload, self.border_router_id or 0)

try:
await AiqDataManager.save_sensor_data(
self.main_session, sensor_data, validated_payload.border_router_id or self.border_router_id
)
await AiqDataManager.save_sensor_data(self.main_session, sensor_data, border_router_id)
except Exception as ex:
ex.add_note("Could not store sensor data in main DB")
if self.allow_backups:
await AiqDataManager.save_sensor_data(
self.backup_session, sensor_data, validated_payload.border_router_id or self.border_router_id
)
await AiqDataManager.save_sensor_data(self.backup_session, sensor_data, border_router_id)

ex.add_note("Could not store sensor data in main DB")
raise ex

if self.is_main_server and self.allow_backups:
await AiqDataManager.save_sensor_data(self.backup_session, sensor_data, validated_payload.border_router_id)
if self.allow_backups:
await AiqDataManager.save_sensor_data(self.backup_session, sensor_data, border_router_id)

except Exception:
trace = traceback.format_exc()
await ManagementBot.send_notification(f"An error occurred in border router {self.border_router_id}:\n {trace}")
await ManagementBot.send_notification(f"An error occurred in server with id {self.server_instance_id}:\n {trace}")
return Message(code=INTERNAL_SERVER_ERROR, payload="")

return Message(code=CHANGED, payload="")
Loading

0 comments on commit c892eb0

Please sign in to comment.