Skip to content

Commit

Permalink
Add sqlserver database adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
joaquingx committed Jan 25, 2024
1 parent 702bdd6 commit 06d3410
Show file tree
Hide file tree
Showing 25 changed files with 314 additions and 84 deletions.
Empty file added __init__.py
Empty file.
Empty file added database_adapters/__init__.py
Empty file.
132 changes: 105 additions & 27 deletions database_adapters/db_adapters.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,77 @@
from abc import ABCMeta, abstractmethod
import logging

import pymongo
import pyodbc
import threading
from bson.objectid import ObjectId
from pymongo.errors import ConnectionFailure, PyMongoError

logger = logging.getLogger(__name__)


class InsertionResponse:
def __init__(self, ok, exception=None, need_upsert=False):
self.ok = ok
self.need_upsert = need_upsert
self.error = None if (ok or exception is None) else exception.__class__.__name__
self.error = None if (ok or exception is None) else f"{exception.__class__.__name__}: {str(exception)}"

class DatabaseReaderInterface(metaclass=ABCMeta):
"""Database adapter that just support read operations."""

class DatabaseInterface(metaclass=ABCMeta):
@abstractmethod
def get_connection(self):
pass

@abstractmethod
def delete_collection_data(self):
def get_all_dataset_data(self):
pass

@abstractmethod
def get_all_collection_data(self):
def get_chunked_dataset_data(self):
pass

@abstractmethod
def get_chunked_collection_data(self):
def get_paginated_dataset_data(self):
pass

@abstractmethod
def get_paginated_collection_data(self):
def get_estimated_item_count(self):
pass

@abstractmethod
def get_estimated_document_count(self):
def get_estimated_item_size(self):
pass

@abstractmethod
def get_estimated_document_size(self):
def get_database_size(self):
pass

class DatabaseWriterInterface(metaclass=ABCMeta):
"""Database adapter that just support write operations."""

@abstractmethod
def insert_one_to_unique_collection(self):
def get_connection(self):
pass

@abstractmethod
def insert_one_to_collection(self):
def delete_dataset_data(self):
pass

@abstractmethod
def insert_many_to_collection(self):
def insert_one_to_unique_dataset(self):
pass

@abstractmethod
def get_database_size(self):
def insert_one_to_dataset(self):
pass

@abstractmethod
def get_collection_size(self):
def insert_many_to_dataset(self):
pass



class MongoAdapter(DatabaseInterface):
class MongoAdapter(DatabaseWriterInterface, DatabaseReaderInterface):
def __init__(self, mongo_connection, mongo_production, mongo_certificate_path):
self.mongo_connection = mongo_connection
self.mongo_production = mongo_production
Expand All @@ -85,7 +94,7 @@ def get_connection(self):
self.client = client
return True

def delete_collection_data(self, database_name, collection_name):
def delete_dataset_data(self, database_name, collection_name):
collection = self.client[database_name][collection_name]
try:
collection.drop()
Expand All @@ -94,17 +103,17 @@ def delete_collection_data(self, database_name, collection_name):
print(ex)
return False

def get_collection_data(self, database_name, collection_name, limit=10000):
def get_dataset_data(self, database_name, collection_name, limit=10000):
collection = self.client[database_name][collection_name]
result = collection.find({}, {"_id": False}).limit(limit)
return list(result)

def get_all_collection_data(self, database_name, collection_name):
def get_all_dataset_data(self, database_name, collection_name):
collection = self.client[database_name][collection_name]
result = collection.find({}, {"_id": False})
return list(result)

def get_chunked_collection_data(
def get_chunked_dataset_data(
self, database_name, collection_name, chunk_size, current_chunk=None
):
collection = self.client[database_name][collection_name]
Expand All @@ -131,7 +140,7 @@ def get_jobs_set_stats(self, database_name, jobs_ids):
)
return list(result)

def get_paginated_collection_data(
def get_paginated_dataset_data(
self, database_name, collection_name, page, page_size
):
collection = self.client[database_name][collection_name]
Expand All @@ -147,16 +156,16 @@ def update_document(self, database_name, collection_name, document_id, new_field
result = collection.update_one({"_id": document_id}, {"$set": new_field})
return result.acknowledged

def get_estimated_document_count(self, database_name, collection_name):
def get_estimated_item_count(self, database_name, collection_name):
collection = self.client[database_name][collection_name]
return collection.estimated_document_count()

def get_estimated_document_size(self, database_name, collection_name):
def get_estimated_item_size(self, database_name, collection_name):
database = self.client[database_name]
document_size = database.command("collstats", collection_name)["avgObjSize"]
return document_size

def insert_one_to_unique_collection(self, database_name, collection_name, item):
def insert_one_to_unique_dataset(self, database_name, collection_name, item):
response = None
try:
self.client[database_name][collection_name].update_one(
Expand All @@ -168,7 +177,7 @@ def insert_one_to_unique_collection(self, database_name, collection_name, item):
finally:
return response

def insert_one_to_collection(self, database_name, collection_name, item):
def insert_one_to_dataset(self, database_name, collection_name, item):
response = None
try:
self.client[database_name][collection_name].insert_one(item)
Expand All @@ -178,7 +187,7 @@ def insert_one_to_collection(self, database_name, collection_name, item):
finally:
return response

def insert_many_to_collection(
def insert_many_to_dataset(
self, database_name, collection_name, items, ordered=False
):
response = None
Expand All @@ -198,17 +207,86 @@ def get_database_size(self, database_name, data_type):
total_size_bytes = 0
for collection in collections:
if data_type in collection:
total_size_bytes += self.get_collection_size(database_name, collection)
total_size_bytes += self.get_dataset_size(database_name, collection)
return total_size_bytes

def get_collection_size(self, database_name, collection_name):
def get_dataset_size(self, database_name, collection_name):
database = self.client[database_name]
collection_size = database.command("collstats", collection_name)["size"]
return collection_size

class SqlServerWriterAdapter(DatabaseWriterInterface):

def __init__(self, connection_string, production, certificate_path):
self.connection_string = connection_string
self.local_storage = threading.local()

def get_connection(self):
if not hasattr(self.local_storage, 'connection'):
try:
self.local_storage.connection = pyodbc.connect(self.connection_string)
return True
except Exception as e:
print(f"Error connecting to SQL Server: {e}")
return False
return True

def _execute_query(self, database_name, query, values=(), execute_many=False):
if not self.get_connection():
return False, "Connection Error"

try:
with self.local_storage.connection.cursor() as cursor:
logger.debug("Executing query: %s", query)
if not execute_many:
cursor.execute(f"USE {database_name}")
cursor.execute(query, values)
else:
cursor.execute(f"USE {database_name}")
cursor.executemany(query, values)
self.local_storage.connection.commit()
return True, None
except pyodbc.Error as e:
self.local_storage.connection.rollback()
logger.debug("Error executing query: %s", query)
return False, e

def insert_one_to_dataset(self, database_name, table_name, item):
# It should `transform`` the item into a valid sql item.
columns = ', '.join(item.keys())
placeholders = ', '.join('?' * len(item))
query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
response, ex = self._execute_query(database_name, query, values=list(item.values()))
return InsertionResponse(response, ex)

def insert_many_to_dataset(self, database_name, table_name, items):
columns = ', '.join(items[0].keys())
placeholders = ', '.join('?' * len(items[0]))
logger.debug("items :%s", str(items))
query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
values_to_insert = [tuple(item.values()) for item in items]
logger.debug("values to insert: %s", str(values_to_insert))
response, ex = self._execute_query(database_name, query, values_to_insert, execute_many=True)
# no upsert needed as execute_many is atomic
return InsertionResponse(response, ex)

def delete_dataset_data(self, database_name, table_name):
query = f"DELETE FROM {table_name}"
response, ex= self._execute_query(database_name, query)
return InsertionResponse(response, ex)

def insert_one_to_unique_dataset(self, database_name, table_name, item): # Needs more discussion.
return self.insert_one_to_dataset(database_name, table_name, item)

def get_database_interface(engine, connection, production, certificate_path):
database_interfaces = {
"mongodb": MongoAdapter(connection, production, certificate_path),
}
return database_interfaces[engine]

def get_database_writer_interface(engine, connection, production, certificate_path):
database_interfaces = {
"mongodb": MongoAdapter(connection, production, certificate_path),
"sqlserver": SqlServerWriterAdapter(connection, production, certificate_path),
}
return database_interfaces[engine]
Empty file.
72 changes: 72 additions & 0 deletions database_adapters/tests/test_db_adapters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import pytest
from unittest.mock import Mock, patch
import pyodbc
from database_adapters.db_adapters import SqlServerWriterAdapter, InsertionResponse

class TestSqlServerWriterAdapter:

@pytest.fixture
def mock_pyodbc_connect(self):
with patch('database_adapters.db_adapters.pyodbc.connect') as mock_connect:
mock_connect.return_value.cursor.return_value.__enter__.return_value.execute = Mock()
mock_connect.return_value.cursor.return_value.__enter__.return_value.executemany = Mock()
yield mock_connect

@pytest.fixture
def writer_adapter(self, mock_pyodbc_connect):
return SqlServerWriterAdapter("dummy_connection_string")

def test_get_connection_success(self, writer_adapter):
assert writer_adapter.get_connection() == True

def test_get_connection_failure(self, mock_pyodbc_connect, writer_adapter):
mock_pyodbc_connect.side_effect = Exception("Connection Error")
assert writer_adapter.get_connection() == False

def test_execute_query_success(self, writer_adapter):
connection = writer_adapter.get_connection()
assert writer_adapter._execute_query("test_db", "SELECT * FROM test_table") == (True, None)

def test_execute_query_failure(self, mock_pyodbc_connect, writer_adapter):
connection = writer_adapter.get_connection()
mock_pyodbc_connect.return_value.cursor.side_effect = pyodbc.DatabaseError("Error")

# Ejecutar la consulta y capturar el resultado
result, error = writer_adapter._execute_query("test_db", "SELECT * FROM test_table")
assert result == False
assert isinstance(error, pyodbc.DatabaseError)
assert str(error) == "Error"

def test_execute_query_with_execute_many(self, writer_adapter):
connection = writer_adapter.get_connection()
assert writer_adapter._execute_query("test_db", "INSERT INTO test_table VALUES (?)", values=[(1,),(2,)], execute_many=True) == (True, None)

def test_insert_one_to_dataset(self, writer_adapter):
item = {"col1": "val1", "col2": "val2"}
connection = writer_adapter.get_connection()
response = writer_adapter.insert_one_to_dataset("test_db", "test_table", item)
assert isinstance(response, InsertionResponse)
assert response.ok == True

def test_insert_many_to_dataset(self, writer_adapter):
items = [{"col1": "val1", "col2": "val2"}, {"col1": "val3", "col2": "val4"}]
connection = writer_adapter.get_connection()
response = writer_adapter.insert_many_to_dataset("test_db", "test_table", items)
assert isinstance(response, InsertionResponse)
assert response.ok == True
assert not response.error
assert not response.need_upsert

def test_delete_dataset_data(self, writer_adapter):
connection = writer_adapter.get_connection()
response = writer_adapter.delete_dataset_data("test_db", "test_table")
assert response.ok == True
assert not response.error
assert not response.need_upsert

def test_insert_one_to_unique_dataset(self, writer_adapter):
connection = writer_adapter.get_connection()
item = {"col1": "val1", "col2": "val2"}
response = writer_adapter.insert_one_to_unique_dataset("test_db", "test_table", item)
assert isinstance(response, InsertionResponse)
assert response.ok == True
13 changes: 7 additions & 6 deletions development-helps/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ REGISTRY_HOST = localhost:5001
API_POD = $$(kubectl get pod -l app=estela-django-api -o jsonpath="{.items[0].metadata.name}")
API_DOC = docs/api.yaml
PLATFORM ?= linux/$(shell uname -m)
M1_LOCAL ?= true

.PHONY: start
start:
Expand All @@ -22,37 +23,37 @@ stop:
.PHONY: update-api-image
update-api-image:
-cd $(API_DIR) && \
docker build .. --file docker-conf/Dockerfile-django-api --tag $(REGISTRY_HOST)/estela-django-api:latest --platform $(PLATFORM)
docker build .. --build-arg M1_LOCAL=${M1_LOCAL} --file docker-conf/Dockerfile-django-api --tag $(REGISTRY_HOST)/estela-django-api:latest --platform $(PLATFORM)
-docker push $(REGISTRY_HOST)/estela-django-api:latest


.PHONY: update-celery-image
update-celery-image:
-cd $(API_DIR) && \
docker build .. --file docker-conf/Dockerfile-celery-beat --tag $(REGISTRY_HOST)/estela-celery-beat:latest --platform $(PLATFORM) && \
docker build .. --file docker-conf/Dockerfile-celery-worker --tag $(REGISTRY_HOST)/estela-celery-worker:latest --platform $(PLATFORM)
docker build .. --build-arg M1_LOCAL=${M1_LOCAL} --file docker-conf/Dockerfile-celery-beat --tag $(REGISTRY_HOST)/estela-celery-beat:latest --platform $(PLATFORM) && \
docker build .. --build-arg M1_LOCAL=${M1_LOCAL} --file docker-conf/Dockerfile-celery-worker --tag $(REGISTRY_HOST)/estela-celery-worker:latest --platform $(PLATFORM)
-docker push $(REGISTRY_HOST)/estela-celery-beat:latest
-docker push $(REGISTRY_HOST)/estela-celery-worker:latest


.PHONY: update-redis-image
update-redis-image:
-cd $(API_DIR) && \
docker build . --file docker-conf/Dockerfile-redis --tag $(REGISTRY_HOST)/estela-redis:latest --platform $(PLATFORM)
docker build . --build-arg M1_LOCAL=${M1_LOCAL} --file docker-conf/Dockerfile-redis --tag $(REGISTRY_HOST)/estela-redis:latest --platform $(PLATFORM)
-docker push $(REGISTRY_HOST)/estela-redis:latest


.PHONY: update-build-project-image
update-build-project-image:
-cd $(API_DIR) && \
docker build .. --file docker-conf/Dockerfile-build-project --tag $(REGISTRY_HOST)/estela-build-project:latest --platform $(PLATFORM)
docker build .. --build-arg M1_LOCAL=${M1_LOCAL} --file docker-conf/Dockerfile-build-project --tag $(REGISTRY_HOST)/estela-build-project:latest --platform $(PLATFORM)
-docker push $(REGISTRY_HOST)/estela-build-project:latest


.PHONY: update-consumer-image
update-consumer-image:
-cd $(QUEUING_DIR) && \
docker build .. --file Dockerfile --tag $(REGISTRY_HOST)/estela-consumer:latest --platform $(PLATFORM)
docker build .. --build-arg M1_LOCAL=${M1_LOCAL} --file Dockerfile --tag $(REGISTRY_HOST)/estela-consumer:latest --platform $(PLATFORM)
-docker push $(REGISTRY_HOST)/estela-consumer:latest


Expand Down
2 changes: 1 addition & 1 deletion estela-api/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def update_stats_from_redis(job, save_to_database=False):

job_collection_name = "{}-{}-job_stats".format(job.spider.sid, job.jid)
job_stats["_id"] = job_collection_name
spiderdata_db_client.insert_one_to_collection(
spiderdata_db_client.insert_one_to_dataset(
str(job.spider.project.pid), "job_stats", job_stats
)

Expand Down
Loading

0 comments on commit 06d3410

Please sign in to comment.