Skip to content

Commit

Permalink
improve python scripts with the logging module
Browse files Browse the repository at this point in the history
  • Loading branch information
TuritoYuenan committed Dec 22, 2024
1 parent bd692a6 commit a3c0b26
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 59 deletions.
2 changes: 1 addition & 1 deletion etl-pipeline/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .

# Run the function
CMD [ "python", "main.py" ]
CMD [ "python", "-u", "main.py" ]
99 changes: 50 additions & 49 deletions etl-pipeline/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import json
import json, logging
from influxdb_client.client.influxdb_client import InfluxDBClient
from influxdb_client.client.write.point import Point
from influxdb_client.client.write_api import SYNCHRONOUS
Expand All @@ -17,79 +17,80 @@
TOPIC = 'swinburne/hcmc_a35'


# MARK: Connection callback
def on_connect(client: Client, userdata, flags, reason_code, properties):
"""Routine to do when connected to the MQTT broker
Args:
client (Client): MQTT Client
userdata (InfluxDBClient): Client to save data
flags (unknown): Program flags
reason_code (_type_): _description_
properties (_type_): _description_
def extract(message: MQTTMessage) -> dict:
"""Extract data from MQTT broker
"""
print('Connected to broker')

client.subscribe(TOPIC)
print(f'Subscribed to {TOPIC} topic')
payload = message.payload.decode('utf-8')
return json.loads(payload)


# MARK: Message callback
def on_message(client: Client, userdata: InfluxDBClient, message: MQTTMessage):
"""Routine to do when received an MQTT message
Args:
client (Client): MQTT Client
user_data (InfluxDBClient): Client to save data
message (MQTTMessage): The received message. Needs to be decoded
def transform(data: dict) -> Point:
"""Transform data to InfluxDB point
"""
write_api = userdata.write_api(SYNCHRONOUS)
if DEBUG_MODE: print('Begin message processing')

payload = message.payload.decode('utf-8')
extract = json.loads(payload)
if DEBUG_MODE: print('Decoded and extracted data')

point = (
return (
Point('weather').tag('city', 'HCMC').tag('campus', 'A35')
.field('humidity', extract['humidity'])
.field('pressure', extract['pressure'])
.field('temperature', extract['temperature'])
.field('rainfall_day', extract['rainfall_day'])
.field('rainfall_hour', extract['rainfall_hour'])
.field('wind_speed_avg', extract['wind_speed_avg'])
.field('wind_speed_max', extract['wind_speed_max'])
.field('wind_direction', extract['wind_direction'])
.field('humidity', data['humidity'])
.field('pressure', data['pressure'])
.field('temperature', data['temperature'])
.field('rainfall_day', data['rainfall_day'])
.field('rainfall_hour', data['rainfall_hour'])
.field('wind_speed_avg', data['wind_speed_avg'])
.field('wind_speed_max', data['wind_speed_max'])
.field('wind_direction', data['wind_direction'])
)


def load(data: Point, database: InfluxDBClient):
"""Load data to InfluxDB
"""
try:
write_api.write('weather_data', record = point)
if DEBUG_MODE: print('Uploaded data')
write_api = database.write_api(SYNCHRONOUS)
write_api.write('weather_data', record = data)
except ApiException:
print('Connection to InfluxDB is unauthorised')
logging.error('3. Error: Connection to InfluxDB is unauthorised')
except:
print('Something wrong happened')
logging.error('3. Error: Something wrong happened')


# MARK: Connection callback
def on_connect(client: Client, userdata, flags, reason_code, properties):
logging.info('Connected to broker')

client.subscribe(TOPIC)
logging.info('Subscribed to topic: %s', TOPIC)


# MARK: Message callback
def on_message(client: Client, userdata: InfluxDBClient, message: MQTTMessage):
if DEBUG_MODE: logging.info('1. Extracting message')
extracted = extract(message)

if DEBUG_MODE: logging.info('2. Transforming to InfluxDB point')
transformed = transform(extracted)

if DEBUG_MODE: logging.info('3. Loading to InfluxDB')
load(transformed, userdata)


# MARK: Main procedure
def main():
"""Main procedure
"""
write_client = InfluxDBClient.from_env_properties()
database_client = InfluxDBClient.from_env_properties()

print("Entering credentials")
logging.info("Entering credentials")
mqtt_client = Client(CallbackAPIVersion.VERSION2, CLIENT_ID)
mqtt_client.user_data_set(database_client)
# mqtt_client.username_pw_set(USERNAME, PASSWORD)
mqtt_client.user_data_set(write_client)

print("Setting up callback functionality")
logging.info("Setting up callback functionality")
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message

print("Connecting to broker")
logging.info("Connecting to broker")
mqtt_client.connect(BROKER_HOST, BROKER_PORT)

print("Connected! Start processing incoming data")
logging.info("Connected! Start processing incoming data")
mqtt_client.loop_forever()

return 0
Expand Down
18 changes: 9 additions & 9 deletions mock-station/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import json, random, time
import json, random, time, logging
from paho.mqtt import client, enums


Expand All @@ -21,23 +21,23 @@ def on_connect(client: client.Client, userdata, flags, reason_code, properties):
reason_code (_type_): _description_
properties (_type_): _description_
"""
print('Connected to broker')
logging.info('Connected to broker')


# MARK: Generate data
def generate_data(client: client.Client):
mock_data: dict[str, int | float] = {
'humidity': random.randint(0, 100),
'pressure': random.randint(100000, 102000),
'humidity': random.randint(0, 99),
'pressure': random.randrange(100000, 102000, 10),
'temperature': random.uniform(10, 40),
'rainfall_day': random.uniform(0, 10),
'rainfall_hour': random.uniform(0, 10),
'wind_speed_avg': random.uniform(0, 60),
'wind_speed_max': random.uniform(0, 60),
'wind_direction': random.randint(0, 360)
'wind_direction': random.randint(0, 359)
}

if DEBUG_MODE: print(mock_data)
if DEBUG_MODE: logging.debug(mock_data)
client.publish(TOPIC, json.dumps(mock_data))


Expand All @@ -47,13 +47,13 @@ def main():
"""
mqtt_client = client.Client(enums.CallbackAPIVersion.VERSION2, CLIENT_ID)

print("Setting up callback functionality")
logging.info("Setting up callback functionality")
mqtt_client.on_connect = on_connect

print("Connecting to broker")
logging.info("Connecting to broker")
mqtt_client.connect(BROKER_HOST, BROKER_PORT)

print("Connected! Start generating weather data")
logging.info("Connected! Start generating weather data")
while True:
generate_data(mqtt_client)
time.sleep(5)
Expand Down

0 comments on commit a3c0b26

Please sign in to comment.