-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnews_db.py
118 lines (101 loc) · 2.96 KB
/
news_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import logging
import sqlite3
from pathlib import Path
from sqlite3 import Error
# Logging
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
)
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.ERROR
)
class NewsDatabase:
DATABASE_NAME = Path("news_database.db")
# SQL queries
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS news (
author TEXT,
title TEXT,
description TEXT,
url TEXT,
pub_date TEXT
);
"""
INSERT_INTO_SQL = """
INSERT INTO news
(author,title,description,url,pub_date)
VALUES(?,?,?,?,?);
"""
SELECT_FROM_SQL = """
SELECT * FROM news;
"""
DELETE_FROM_SQL = """
DELETE FROM news;
"""
SELECT_COUNT_SQL = """
SELECT COUNT(*) FROM news;
"""
def check_entities_count(self, conn) -> int:
return conn.cursor().execute(self.SELECT_COUNT_SQL).fetchone()[0]
def create_connection(self, db_file: Path):
"""
Create db file
:param db_file: path to db file to create
:return:
"""
try:
conn = sqlite3.connect(db_file)
logging.info("Connection created successfully !")
return conn
except Error as create_conn_err:
logging.error(create_conn_err)
return None
def create_table(self, conn):
"""
:param conn: Connection to the SQLite database
:return:
"""
try:
c = conn.cursor()
c.execute(self.CREATE_TABLE_SQL)
logging.info(f"Table created successfully !")
except Error as create_table_err:
logging.error(create_table_err)
def insert_into(self, conn, data: list):
"""
Insert data to base
This is the load step in ETL pipeline
:param conn: Connection to the SQLite database
:param data:
:return:
"""
try:
cur = conn.cursor()
cur.execute(self.INSERT_INTO_SQL, data)
conn.commit()
logging.info(
f"Data inserted successfully ! Entities in db for now: {self.check_entities_count(conn)}"
)
return cur.lastrowid
except Error as insert_err:
logging.error(insert_err)
def get_all_news(self, conn):
"""
Query all rows in the news table
:param conn: Connection to the SQLite database
:return:
"""
return conn.cursor().execute(self.SELECT_FROM_SQL).fetchall()
async def delete_all_news(self, conn):
"""
Delete all rows in the news table
:param conn: Connection to the SQLite database
:return:
"""
conn.cursor().execute(self.DELETE_FROM_SQL)
conn.commit()
logging.info(
f"Database was purged successfully ! Entities in db for now: {self.check_entities_count(conn)}"
)
if __name__ == "__main__":
pass