From 06d3410d26b02d77678de699e1b36f04606b86c8 Mon Sep 17 00:00:00 2001 From: Joaquin Date: Thu, 25 Jan 2024 16:07:20 -0500 Subject: [PATCH] Add sqlserver database adapter --- __init__.py | 0 database_adapters/__init__.py | 0 database_adapters/db_adapters.py | 132 ++++++++++++++---- database_adapters/tests/__init__.py | 0 database_adapters/tests/test_db_adapters.py | 72 ++++++++++ development-helps/Makefile | 13 +- estela-api/api/utils.py | 2 +- estela-api/api/views/job_data.py | 14 +- estela-api/core/tasks.py | 10 +- .../docker-conf/Dockerfile-build-project | 8 ++ estela-api/docker-conf/Dockerfile-celery-beat | 9 ++ .../docker-conf/Dockerfile-celery-worker | 9 ++ estela-api/docker-conf/Dockerfile-django-api | 8 ++ estela-api/requirements/base.in | 1 + estela-api/requirements/base.txt | 4 +- estela-api/requirements/deploy.txt | 6 +- estela-api/requirements/test.txt | 10 +- installation/Makefile | 5 +- queueing/Dockerfile | 8 +- queueing/config/__init__.py | 0 queueing/config/database_manager.py | 4 +- queueing/consumer.py | 39 ++++-- queueing/inserter.py | 27 ++-- queueing/requirements/consumer.in | 1 + queueing/requirements/consumer.txt | 16 ++- 25 files changed, 314 insertions(+), 84 deletions(-) create mode 100644 __init__.py create mode 100644 database_adapters/__init__.py create mode 100644 database_adapters/tests/__init__.py create mode 100644 database_adapters/tests/test_db_adapters.py create mode 100644 queueing/config/__init__.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/database_adapters/__init__.py b/database_adapters/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/database_adapters/db_adapters.py b/database_adapters/db_adapters.py index d8ed661a..b89cbb00 100644 --- a/database_adapters/db_adapters.py +++ b/database_adapters/db_adapters.py @@ -1,68 +1,77 @@ from abc import ABCMeta, abstractmethod +import logging import pymongo +import pyodbc +import threading from bson.objectid import ObjectId from pymongo.errors import ConnectionFailure, PyMongoError +logger = logging.getLogger(__name__) + class InsertionResponse: def __init__(self, ok, exception=None, need_upsert=False): self.ok = ok self.need_upsert = need_upsert - self.error = None if (ok or exception is None) else exception.__class__.__name__ + self.error = None if (ok or exception is None) else f"{exception.__class__.__name__}: {str(exception)}" +class DatabaseReaderInterface(metaclass=ABCMeta): + """Database adapter that just support read operations.""" -class DatabaseInterface(metaclass=ABCMeta): @abstractmethod def get_connection(self): pass @abstractmethod - def delete_collection_data(self): + def get_all_dataset_data(self): pass @abstractmethod - def get_all_collection_data(self): + def get_chunked_dataset_data(self): pass @abstractmethod - def get_chunked_collection_data(self): + def get_paginated_dataset_data(self): pass @abstractmethod - def get_paginated_collection_data(self): + def get_estimated_item_count(self): pass @abstractmethod - def get_estimated_document_count(self): + def get_estimated_item_size(self): pass @abstractmethod - def get_estimated_document_size(self): + def get_database_size(self): pass +class DatabaseWriterInterface(metaclass=ABCMeta): + """Database adapter that just support write operations.""" + @abstractmethod - def insert_one_to_unique_collection(self): + def get_connection(self): pass @abstractmethod - def insert_one_to_collection(self): + def delete_dataset_data(self): pass @abstractmethod - def insert_many_to_collection(self): + def insert_one_to_unique_dataset(self): pass @abstractmethod - def get_database_size(self): + def insert_one_to_dataset(self): pass @abstractmethod - def get_collection_size(self): + def insert_many_to_dataset(self): pass + - -class MongoAdapter(DatabaseInterface): +class MongoAdapter(DatabaseWriterInterface, DatabaseReaderInterface): def __init__(self, mongo_connection, mongo_production, mongo_certificate_path): self.mongo_connection = mongo_connection self.mongo_production = mongo_production @@ -85,7 +94,7 @@ def get_connection(self): self.client = client return True - def delete_collection_data(self, database_name, collection_name): + def delete_dataset_data(self, database_name, collection_name): collection = self.client[database_name][collection_name] try: collection.drop() @@ -94,17 +103,17 @@ def delete_collection_data(self, database_name, collection_name): print(ex) return False - def get_collection_data(self, database_name, collection_name, limit=10000): + def get_dataset_data(self, database_name, collection_name, limit=10000): collection = self.client[database_name][collection_name] result = collection.find({}, {"_id": False}).limit(limit) return list(result) - def get_all_collection_data(self, database_name, collection_name): + def get_all_dataset_data(self, database_name, collection_name): collection = self.client[database_name][collection_name] result = collection.find({}, {"_id": False}) return list(result) - def get_chunked_collection_data( + def get_chunked_dataset_data( self, database_name, collection_name, chunk_size, current_chunk=None ): collection = self.client[database_name][collection_name] @@ -131,7 +140,7 @@ def get_jobs_set_stats(self, database_name, jobs_ids): ) return list(result) - def get_paginated_collection_data( + def get_paginated_dataset_data( self, database_name, collection_name, page, page_size ): collection = self.client[database_name][collection_name] @@ -147,16 +156,16 @@ def update_document(self, database_name, collection_name, document_id, new_field result = collection.update_one({"_id": document_id}, {"$set": new_field}) return result.acknowledged - def get_estimated_document_count(self, database_name, collection_name): + def get_estimated_item_count(self, database_name, collection_name): collection = self.client[database_name][collection_name] return collection.estimated_document_count() - def get_estimated_document_size(self, database_name, collection_name): + def get_estimated_item_size(self, database_name, collection_name): database = self.client[database_name] document_size = database.command("collstats", collection_name)["avgObjSize"] return document_size - def insert_one_to_unique_collection(self, database_name, collection_name, item): + def insert_one_to_unique_dataset(self, database_name, collection_name, item): response = None try: self.client[database_name][collection_name].update_one( @@ -168,7 +177,7 @@ def insert_one_to_unique_collection(self, database_name, collection_name, item): finally: return response - def insert_one_to_collection(self, database_name, collection_name, item): + def insert_one_to_dataset(self, database_name, collection_name, item): response = None try: self.client[database_name][collection_name].insert_one(item) @@ -178,7 +187,7 @@ def insert_one_to_collection(self, database_name, collection_name, item): finally: return response - def insert_many_to_collection( + def insert_many_to_dataset( self, database_name, collection_name, items, ordered=False ): response = None @@ -198,17 +207,86 @@ def get_database_size(self, database_name, data_type): total_size_bytes = 0 for collection in collections: if data_type in collection: - total_size_bytes += self.get_collection_size(database_name, collection) + total_size_bytes += self.get_dataset_size(database_name, collection) return total_size_bytes - def get_collection_size(self, database_name, collection_name): + def get_dataset_size(self, database_name, collection_name): database = self.client[database_name] collection_size = database.command("collstats", collection_name)["size"] return collection_size +class SqlServerWriterAdapter(DatabaseWriterInterface): + + def __init__(self, connection_string, production, certificate_path): + self.connection_string = connection_string + self.local_storage = threading.local() + + def get_connection(self): + if not hasattr(self.local_storage, 'connection'): + try: + self.local_storage.connection = pyodbc.connect(self.connection_string) + return True + except Exception as e: + print(f"Error connecting to SQL Server: {e}") + return False + return True + + def _execute_query(self, database_name, query, values=(), execute_many=False): + if not self.get_connection(): + return False, "Connection Error" + + try: + with self.local_storage.connection.cursor() as cursor: + logger.debug("Executing query: %s", query) + if not execute_many: + cursor.execute(f"USE {database_name}") + cursor.execute(query, values) + else: + cursor.execute(f"USE {database_name}") + cursor.executemany(query, values) + self.local_storage.connection.commit() + return True, None + except pyodbc.Error as e: + self.local_storage.connection.rollback() + logger.debug("Error executing query: %s", query) + return False, e + + def insert_one_to_dataset(self, database_name, table_name, item): + # It should `transform`` the item into a valid sql item. + columns = ', '.join(item.keys()) + placeholders = ', '.join('?' * len(item)) + query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" + response, ex = self._execute_query(database_name, query, values=list(item.values())) + return InsertionResponse(response, ex) + + def insert_many_to_dataset(self, database_name, table_name, items): + columns = ', '.join(items[0].keys()) + placeholders = ', '.join('?' * len(items[0])) + logger.debug("items :%s", str(items)) + query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" + values_to_insert = [tuple(item.values()) for item in items] + logger.debug("values to insert: %s", str(values_to_insert)) + response, ex = self._execute_query(database_name, query, values_to_insert, execute_many=True) + # no upsert needed as execute_many is atomic + return InsertionResponse(response, ex) + + def delete_dataset_data(self, database_name, table_name): + query = f"DELETE FROM {table_name}" + response, ex= self._execute_query(database_name, query) + return InsertionResponse(response, ex) + + def insert_one_to_unique_dataset(self, database_name, table_name, item): # Needs more discussion. + return self.insert_one_to_dataset(database_name, table_name, item) def get_database_interface(engine, connection, production, certificate_path): database_interfaces = { "mongodb": MongoAdapter(connection, production, certificate_path), } return database_interfaces[engine] + +def get_database_writer_interface(engine, connection, production, certificate_path): + database_interfaces = { + "mongodb": MongoAdapter(connection, production, certificate_path), + "sqlserver": SqlServerWriterAdapter(connection, production, certificate_path), + } + return database_interfaces[engine] diff --git a/database_adapters/tests/__init__.py b/database_adapters/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/database_adapters/tests/test_db_adapters.py b/database_adapters/tests/test_db_adapters.py new file mode 100644 index 00000000..bd960555 --- /dev/null +++ b/database_adapters/tests/test_db_adapters.py @@ -0,0 +1,72 @@ +import pytest +from unittest.mock import Mock, patch +import pyodbc +from database_adapters.db_adapters import SqlServerWriterAdapter, InsertionResponse + +class TestSqlServerWriterAdapter: + + @pytest.fixture + def mock_pyodbc_connect(self): + with patch('database_adapters.db_adapters.pyodbc.connect') as mock_connect: + mock_connect.return_value.cursor.return_value.__enter__.return_value.execute = Mock() + mock_connect.return_value.cursor.return_value.__enter__.return_value.executemany = Mock() + yield mock_connect + + @pytest.fixture + def writer_adapter(self, mock_pyodbc_connect): + return SqlServerWriterAdapter("dummy_connection_string") + + def test_get_connection_success(self, writer_adapter): + assert writer_adapter.get_connection() == True + + def test_get_connection_failure(self, mock_pyodbc_connect, writer_adapter): + mock_pyodbc_connect.side_effect = Exception("Connection Error") + assert writer_adapter.get_connection() == False + + def test_execute_query_success(self, writer_adapter): + connection = writer_adapter.get_connection() + assert writer_adapter._execute_query("test_db", "SELECT * FROM test_table") == (True, None) + + def test_execute_query_failure(self, mock_pyodbc_connect, writer_adapter): + connection = writer_adapter.get_connection() + mock_pyodbc_connect.return_value.cursor.side_effect = pyodbc.DatabaseError("Error") + + # Ejecutar la consulta y capturar el resultado + result, error = writer_adapter._execute_query("test_db", "SELECT * FROM test_table") + assert result == False + assert isinstance(error, pyodbc.DatabaseError) + assert str(error) == "Error" + + def test_execute_query_with_execute_many(self, writer_adapter): + connection = writer_adapter.get_connection() + assert writer_adapter._execute_query("test_db", "INSERT INTO test_table VALUES (?)", values=[(1,),(2,)], execute_many=True) == (True, None) + + def test_insert_one_to_dataset(self, writer_adapter): + item = {"col1": "val1", "col2": "val2"} + connection = writer_adapter.get_connection() + response = writer_adapter.insert_one_to_dataset("test_db", "test_table", item) + assert isinstance(response, InsertionResponse) + assert response.ok == True + + def test_insert_many_to_dataset(self, writer_adapter): + items = [{"col1": "val1", "col2": "val2"}, {"col1": "val3", "col2": "val4"}] + connection = writer_adapter.get_connection() + response = writer_adapter.insert_many_to_dataset("test_db", "test_table", items) + assert isinstance(response, InsertionResponse) + assert response.ok == True + assert not response.error + assert not response.need_upsert + + def test_delete_dataset_data(self, writer_adapter): + connection = writer_adapter.get_connection() + response = writer_adapter.delete_dataset_data("test_db", "test_table") + assert response.ok == True + assert not response.error + assert not response.need_upsert + + def test_insert_one_to_unique_dataset(self, writer_adapter): + connection = writer_adapter.get_connection() + item = {"col1": "val1", "col2": "val2"} + response = writer_adapter.insert_one_to_unique_dataset("test_db", "test_table", item) + assert isinstance(response, InsertionResponse) + assert response.ok == True diff --git a/development-helps/Makefile b/development-helps/Makefile index 263f1da5..a29792ed 100644 --- a/development-helps/Makefile +++ b/development-helps/Makefile @@ -8,6 +8,7 @@ REGISTRY_HOST = localhost:5001 API_POD = $$(kubectl get pod -l app=estela-django-api -o jsonpath="{.items[0].metadata.name}") API_DOC = docs/api.yaml PLATFORM ?= linux/$(shell uname -m) +M1_LOCAL ?= true .PHONY: start start: @@ -22,15 +23,15 @@ stop: .PHONY: update-api-image update-api-image: -cd $(API_DIR) && \ - docker build .. --file docker-conf/Dockerfile-django-api --tag $(REGISTRY_HOST)/estela-django-api:latest --platform $(PLATFORM) + docker build .. --build-arg M1_LOCAL=${M1_LOCAL} --file docker-conf/Dockerfile-django-api --tag $(REGISTRY_HOST)/estela-django-api:latest --platform $(PLATFORM) -docker push $(REGISTRY_HOST)/estela-django-api:latest .PHONY: update-celery-image update-celery-image: -cd $(API_DIR) && \ - docker build .. --file docker-conf/Dockerfile-celery-beat --tag $(REGISTRY_HOST)/estela-celery-beat:latest --platform $(PLATFORM) && \ - docker build .. --file docker-conf/Dockerfile-celery-worker --tag $(REGISTRY_HOST)/estela-celery-worker:latest --platform $(PLATFORM) + docker build .. --build-arg M1_LOCAL=${M1_LOCAL} --file docker-conf/Dockerfile-celery-beat --tag $(REGISTRY_HOST)/estela-celery-beat:latest --platform $(PLATFORM) && \ + docker build .. --build-arg M1_LOCAL=${M1_LOCAL} --file docker-conf/Dockerfile-celery-worker --tag $(REGISTRY_HOST)/estela-celery-worker:latest --platform $(PLATFORM) -docker push $(REGISTRY_HOST)/estela-celery-beat:latest -docker push $(REGISTRY_HOST)/estela-celery-worker:latest @@ -38,21 +39,21 @@ update-celery-image: .PHONY: update-redis-image update-redis-image: -cd $(API_DIR) && \ - docker build . --file docker-conf/Dockerfile-redis --tag $(REGISTRY_HOST)/estela-redis:latest --platform $(PLATFORM) + docker build . --build-arg M1_LOCAL=${M1_LOCAL} --file docker-conf/Dockerfile-redis --tag $(REGISTRY_HOST)/estela-redis:latest --platform $(PLATFORM) -docker push $(REGISTRY_HOST)/estela-redis:latest .PHONY: update-build-project-image update-build-project-image: -cd $(API_DIR) && \ - docker build .. --file docker-conf/Dockerfile-build-project --tag $(REGISTRY_HOST)/estela-build-project:latest --platform $(PLATFORM) + docker build .. --build-arg M1_LOCAL=${M1_LOCAL} --file docker-conf/Dockerfile-build-project --tag $(REGISTRY_HOST)/estela-build-project:latest --platform $(PLATFORM) -docker push $(REGISTRY_HOST)/estela-build-project:latest .PHONY: update-consumer-image update-consumer-image: -cd $(QUEUING_DIR) && \ - docker build .. --file Dockerfile --tag $(REGISTRY_HOST)/estela-consumer:latest --platform $(PLATFORM) + docker build .. --build-arg M1_LOCAL=${M1_LOCAL} --file Dockerfile --tag $(REGISTRY_HOST)/estela-consumer:latest --platform $(PLATFORM) -docker push $(REGISTRY_HOST)/estela-consumer:latest diff --git a/estela-api/api/utils.py b/estela-api/api/utils.py index 28e70049..802082d2 100644 --- a/estela-api/api/utils.py +++ b/estela-api/api/utils.py @@ -62,7 +62,7 @@ def update_stats_from_redis(job, save_to_database=False): job_collection_name = "{}-{}-job_stats".format(job.spider.sid, job.jid) job_stats["_id"] = job_collection_name - spiderdata_db_client.insert_one_to_collection( + spiderdata_db_client.insert_one_to_dataset( str(job.spider.project.pid), "job_stats", job_stats ) diff --git a/estela-api/api/views/job_data.py b/estela-api/api/views/job_data.py index b4681133..ebb71569 100644 --- a/estela-api/api/views/job_data.py +++ b/estela-api/api/views/job_data.py @@ -100,7 +100,7 @@ def list(self, request, *args, **kwargs): job = SpiderJob.objects.filter(jid=kwargs["jid"]).get() job_collection_name = self.get_collection_name(job, data_type) - count = spiderdata_db_client.get_estimated_document_count( + count = spiderdata_db_client.get_estimated_item_count( kwargs["pid"], job_collection_name ) @@ -119,12 +119,12 @@ def list(self, request, *args, **kwargs): chunk_size = max( 1, settings.MAX_CLI_DOWNLOAD_CHUNK_SIZE - // spiderdata_db_client.get_estimated_document_size( + // spiderdata_db_client.get_estimated_item_size( kwargs["pid"], job_collection_name ), ) current_chunk = request.query_params.get("current_chunk", None) - result, next_chunk = spiderdata_db_client.get_chunked_collection_data( + result, next_chunk = spiderdata_db_client.get_chunked_dataset_data( kwargs["pid"], job_collection_name, chunk_size, current_chunk ) response = {"count": count, "results": result} @@ -132,7 +132,7 @@ def list(self, request, *args, **kwargs): response["next_chunk"] = next_chunk return Response(response) else: - result = spiderdata_db_client.get_paginated_collection_data( + result = spiderdata_db_client.get_paginated_dataset_data( kwargs["pid"], job_collection_name, page, page_size ) @@ -207,11 +207,11 @@ def download(self, request, *args, **kwargs): docs_limit = max( 1, settings.MAX_WEB_DOWNLOAD_SIZE - // spiderdata_db_client.get_estimated_document_size( + // spiderdata_db_client.get_estimated_item_size( kwargs["pid"], job_collection_name ), ) - data = spiderdata_db_client.get_collection_data( + data = spiderdata_db_client.get_dataset_data( kwargs["pid"], job_collection_name, docs_limit ) @@ -243,7 +243,7 @@ def delete(self, request, *args, **kwargs): raise DataBaseError({"error": errors.UNABLE_CONNECT_DB}) job_collection_name = self.get_collection_name(job, data_type) - deleted_data = spiderdata_db_client.delete_collection_data( + deleted_data = spiderdata_db_client.delete_dataset_data( kwargs["pid"], job_collection_name ) chain_of_usage_process = get_chain_to_process_usage_data( diff --git a/estela-api/core/tasks.py b/estela-api/core/tasks.py index e479b3cd..3d3577dc 100644 --- a/estela-api/core/tasks.py +++ b/estela-api/core/tasks.py @@ -71,7 +71,7 @@ def delete_data(pid, sid, jid, data_type): else: job_collection_name = "{}-{}-job_{}".format(sid, jid, data_type) - spiderdata_db_client.delete_collection_data(pid, job_collection_name) + spiderdata_db_client.delete_dataset_data(pid, job_collection_name) @celery_app.task(name="core.tasks.launch_job") @@ -226,16 +226,16 @@ def record_project_usage_after_job_event(job_id): unique_collection = True else: items_collection_name = "{}-{}-job_items".format(job.spider.sid, job.jid) - items_data_size = spiderdata_db_client.get_collection_size( + items_data_size = spiderdata_db_client.get_dataset_size( str(project.pid), items_collection_name ) unique_collection = False requests_collection_name = "{}-{}-job_requests".format(job.spider.sid, job.jid) - requests_data_size = spiderdata_db_client.get_collection_size( + requests_data_size = spiderdata_db_client.get_dataset_size( str(project.pid), requests_collection_name ) logs_collection_name = "{}-{}-job_logs".format(job.spider.sid, job.jid) - logs_data_size = spiderdata_db_client.get_collection_size( + logs_data_size = spiderdata_db_client.get_dataset_size( str(project.pid), logs_collection_name ) # Tracking Proxy Usage @@ -309,7 +309,7 @@ def record_job_coverage_event(job_id): pid = job.spider.project.pid sid = job.spider.sid items_collection_name = f"{sid}-{job.jid}-job_items" - items: List[dict] = spiderdata_db_client.get_all_collection_data( + items: List[dict] = spiderdata_db_client.get_all_dataset_data( str(pid), items_collection_name ) total_items = len(items) diff --git a/estela-api/docker-conf/Dockerfile-build-project b/estela-api/docker-conf/Dockerfile-build-project index 3a2f996c..8b7d798c 100644 --- a/estela-api/docker-conf/Dockerfile-build-project +++ b/estela-api/docker-conf/Dockerfile-build-project @@ -2,6 +2,7 @@ FROM python:3.9 WORKDIR /home/estela +ARG M1_LOCAL = false COPY estela-api/requirements ./requirements ENV PYTHONDONTWRITEBYTECODE 1 @@ -25,6 +26,13 @@ RUN apt-get update RUN apt-get install docker-ce docker-ce-cli containerd.io -y +# If M1 chip is used, install pyodbc and unixodbc-dev +RUN if [ "$M1_LOCAL" = "true" ]; then \ + apt-get update && apt-get install -y unixodbc-dev && \ + pip install --no-binary :all: pyodbc && \ + sed -i '/pyodbc/d' requirements/deploy.txt; \ + fi + RUN pip install -r requirements/deploy.txt RUN if test -f "requirements/externalApps.txt"; then pip install -r requirements/externalApps.txt; fi diff --git a/estela-api/docker-conf/Dockerfile-celery-beat b/estela-api/docker-conf/Dockerfile-celery-beat index dc32547e..dfd00ef9 100644 --- a/estela-api/docker-conf/Dockerfile-celery-beat +++ b/estela-api/docker-conf/Dockerfile-celery-beat @@ -1,5 +1,7 @@ FROM python:3.9 +ARG M1_LOCAL=false + WORKDIR /home/estela COPY estela-api/requirements ./requirements @@ -7,6 +9,13 @@ COPY estela-api/requirements ./requirements ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 +# If M1 chip is used, install pyodbc and unixodbc-dev +RUN if [ "$M1_LOCAL" = "true" ]; then \ + apt-get update && apt-get install -y unixodbc-dev && \ + pip install --no-binary :all: pyodbc && \ + sed -i '/pyodbc/d' requirements/test.txt; \ + fi + RUN pip install -r requirements/test.txt RUN if test -f "requirements/externalApps.txt"; then pip install -r requirements/externalApps.txt; fi diff --git a/estela-api/docker-conf/Dockerfile-celery-worker b/estela-api/docker-conf/Dockerfile-celery-worker index dc32547e..df80592f 100644 --- a/estela-api/docker-conf/Dockerfile-celery-worker +++ b/estela-api/docker-conf/Dockerfile-celery-worker @@ -1,5 +1,7 @@ FROM python:3.9 +ARG M1_LOCAL=false + WORKDIR /home/estela COPY estela-api/requirements ./requirements @@ -7,6 +9,13 @@ COPY estela-api/requirements ./requirements ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 +# If M1 chip is used, install pyodbc and unixodbc-dev +RUN if [ "$M1_LOCAL" = "true" ]; then \ + apt-get update && apt-get install -y unixodbc-dev && \ + pip install --no-binary :all: pyodbc && \ + sed -i '/pyodbc/d' requirements/test.txt; \ + fi + RUN pip install -r requirements/test.txt RUN if test -f "requirements/externalApps.txt"; then pip install -r requirements/externalApps.txt; fi diff --git a/estela-api/docker-conf/Dockerfile-django-api b/estela-api/docker-conf/Dockerfile-django-api index 0530dd86..31917340 100644 --- a/estela-api/docker-conf/Dockerfile-django-api +++ b/estela-api/docker-conf/Dockerfile-django-api @@ -1,12 +1,20 @@ FROM python:3.9 WORKDIR /home/estela +ARG M1_LOCAL=false COPY estela-api/requirements ./requirements ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 +# If M1 chip is used, install pyodbc and unixodbc-dev +RUN if [ "$M1_LOCAL" = "true" ]; then \ + apt-get update && apt-get install -y unixodbc-dev && \ + pip install --no-binary :all: pyodbc && \ + sed -i '/pyodbc/d' requirements/test.txt; \ + fi + RUN pip install -r requirements/test.txt RUN if test -f "requirements/externalApps.txt"; then pip install -r requirements/externalApps.txt; fi diff --git a/estela-api/requirements/base.in b/estela-api/requirements/base.in index 30521cb2..ef3830dd 100644 --- a/estela-api/requirements/base.in +++ b/estela-api/requirements/base.in @@ -16,6 +16,7 @@ google-cloud-storage mysqlclient minio pymongo[srv] +pyodbc redis gunicorn git+https://github.com/bitmakerla/estela-queue-adapter.git diff --git a/estela-api/requirements/base.txt b/estela-api/requirements/base.txt index 1202e2d4..182c15c8 100644 --- a/estela-api/requirements/base.txt +++ b/estela-api/requirements/base.txt @@ -163,6 +163,8 @@ pyasn1-modules==0.2.8 # via google-auth pymongo[srv]==3.12.0 # via -r base.in +pyodbc==5.0.1 + # via -r base.in pyparsing==2.4.7 # via packaging python-crontab==2.5.1 @@ -201,7 +203,7 @@ rsa==4.5 # google-auth ruamel-yaml==0.17.10 # via drf-yasg -ruamel-yaml-clib==0.2.6 +ruamel-yaml-clib==0.2.8 # via ruamel-yaml s3transfer==0.3.6 # via diff --git a/estela-api/requirements/deploy.txt b/estela-api/requirements/deploy.txt index c2b1d52a..efac4e50 100644 --- a/estela-api/requirements/deploy.txt +++ b/estela-api/requirements/deploy.txt @@ -241,6 +241,10 @@ pyasn1-modules==0.2.8 # -r base.txt # google-auth pymongo[srv]==3.12.0 + # via + # -r base.txt + # pymongo +pyodbc==5.0.1 # via -r base.txt pyparsing==2.4.7 # via @@ -296,7 +300,7 @@ ruamel-yaml==0.17.10 # via # -r base.txt # drf-yasg -ruamel-yaml-clib==0.2.6 +ruamel-yaml-clib==0.2.8 # via # -r base.txt # ruamel-yaml diff --git a/estela-api/requirements/test.txt b/estela-api/requirements/test.txt index a2d198b3..4a55859e 100644 --- a/estela-api/requirements/test.txt +++ b/estela-api/requirements/test.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.9 +# This file is autogenerated by pip-compile with Python 3.10 # by the following command: # # pip-compile test.in @@ -250,6 +250,8 @@ pyasn1-modules==0.2.8 # google-auth pymongo[srv]==3.12.0 # via -r base.txt +pyodbc==5.0.1 + # via -r base.txt pyparsing==2.4.7 # via # -r base.txt @@ -307,10 +309,8 @@ ruamel-yaml==0.17.10 # via # -r base.txt # drf-yasg -ruamel-yaml-clib==0.2.6 - # via - # -r base.txt - # ruamel-yaml +ruamel-yaml-clib==0.2.8 + # via -r base.txt s3transfer==0.3.6 # via # -r base.txt diff --git a/installation/Makefile b/installation/Makefile index a3b2bbf8..1b7c70b3 100644 --- a/installation/Makefile +++ b/installation/Makefile @@ -7,6 +7,7 @@ LOCAL_API_IP = $$(kubectl get services -n $${NAMESPACE} estela-django-api-servic RESOURCES = db registry minio zookeeper kafka SERVICES ?= django-api celery-worker celery-beat redis build-project PLATFORM ?= linux/$(shell uname -m) +M1_LOCAL ?= false .PHONY: resources @@ -33,9 +34,9 @@ delete-resources: .PHONY: build-all-images build-all-images: -. ./local/.env && for service in $(SERVICES); do \ - cd $(API_DIR) && docker build .. --file docker-conf/Dockerfile-$$service --tag $${LOCAL_REGISTRY}/estela-$$service:latest; \ + cd $(API_DIR) && docker build .. --build-arg M1_LOCAL=${M1_LOCAL} --file docker-conf/Dockerfile-$$service --tag $${LOCAL_REGISTRY}/estela-$$service:latest; \ done - -. ./local/.env && cd $(QUEUING_DIR) && docker build .. --file Dockerfile --tag $${LOCAL_REGISTRY}/estela-consumer:latest + -. ./local/.env && cd $(QUEUING_DIR) && docker build .. --build-arg M1_LOCAL=${M1_LOCAL} --file Dockerfile --tag $${LOCAL_REGISTRY}/estela-consumer:latest .PHONY: upload-all-images upload-all-images: diff --git a/queueing/Dockerfile b/queueing/Dockerfile index f384065d..176f0468 100644 --- a/queueing/Dockerfile +++ b/queueing/Dockerfile @@ -1,11 +1,17 @@ -FROM python:3.6 +FROM python:3.9 +ARG M1_LOCAL=false ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 WORKDIR /home/estela COPY queueing/requirements requirements +RUN if [ "$M1_LOCAL" = "true" ]; then \ + apt-get update && apt-get install -y unixodbc-dev && \ + pip install --no-binary :all: pyodbc && \ + sed -i '/pyodbc/d' requirements/consumer.txt; \ + fi RUN pip install -r requirements/consumer.txt COPY queueing/consumer.py . diff --git a/queueing/config/__init__.py b/queueing/config/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/queueing/config/database_manager.py b/queueing/config/database_manager.py index df088ccb..eaad1d0f 100644 --- a/queueing/config/database_manager.py +++ b/queueing/config/database_manager.py @@ -1,6 +1,6 @@ import os -from database_adapters.db_adapters import get_database_interface +from database_adapters.db_adapters import get_database_writer_interface if os.getenv("PRODUCTION") == "False": db_production = False @@ -8,7 +8,7 @@ db_production = True db_certificate_path = os.getenv("DB_CERTIFICATE_PATH") -db_client = get_database_interface( +db_client = get_database_writer_interface( engine=os.getenv("DB_ENGINE"), connection=os.getenv("DB_CONNECTION"), production=db_production, diff --git a/queueing/consumer.py b/queueing/consumer.py index 02b510bd..1cb72957 100644 --- a/queueing/consumer.py +++ b/queueing/consumer.py @@ -14,11 +14,12 @@ WORKER_POOL = int(os.getenv("WORKER_POOL", "10")) HEARTBEAT_TICK = int(os.getenv("HEARTBEAT_TICK", "300")) QUEUE_BASE_TIMEOUT = int(os.getenv("QUEUE_BASE_TIMEOUT", "5")) -QUEUE_MAX_TIMEOUT = int(os.getenv("QUEUE_MAX_TIMEOUT", "300")) +QUEUE_MAX_TIMEOUT = int(os.getenv("QUEUE_MAX_TIMEOUT", "10")) item_queue = Queue() inserters = {} heartbeat_lock = threading.Lock() +logger = logging.getLogger(__name__) def read_from_queue(): @@ -58,7 +59,7 @@ def heartbeat(): time.sleep(HEARTBEAT_TICK) with heartbeat_lock: - logging.debug("Heartbeat: A new inspection has started.") + logger.debug("Heartbeat: A new inspection has started.") for worker in workers: worker.join() @@ -71,9 +72,31 @@ def heartbeat(): ): del inserters[identifier] - logging.debug("Heartbeat: {} alive inserters.".format(len(inserters))) + logger.debug("Heartbeat: {} alive inserters.".format(len(inserters))) +def split_jid(jid): + return jid.split(".") + + +def get_db_name(item): + if "db_name" in item: + logger.debug("Using custom database name: {}".format(item["db_name"])) + return item["db_name"] + # This should be deprecated. + if "jid" in item: + _, _, project = split_jid(item["jid"]) + logger.debug("Using generated database name: {}".format(project)) + return project + +def get_dataset_name(item, topic_name): + if "dataset_name" in item: + logger.debug("Using custom dataset_name: {}".format(item["dataset_name"])) + return item["dataset_name"] + job, spider, _ = split_jid(item["jid"]) + logger.debug("Using generated dataset_name: {}-{}-{}".format(spider, job, topic_name)) + return "{}-{}-{}".format(spider, job, topic_name) + def consume_from_queue_platform(topic_name): if db_client.get_connection(): logging.info("DB: connection established.") @@ -90,21 +113,19 @@ def consume_from_queue_platform(topic_name): _heartbeat = threading.Thread(target=heartbeat, daemon=True) _heartbeat.start() - for message in consumer: if heartbeat_lock.locked(): heartbeat_lock.acquire() heartbeat_lock.release() - job, spider, project = message.value["jid"].split(".") - - collection_name = "{}-{}-{}".format(spider, job, topic_name) - identifier = "{}/{}".format(project, collection_name) + db_name = get_db_name(message.value) + dataset_name = get_dataset_name(message.value, topic_name) + identifier = "{}/{}".format(db_name, dataset_name) unique = message.value.get("unique", "") == "True" if inserters.get(identifier) is None: inserters[identifier] = Inserter( - db_client, project, collection_name, unique, topic_name + db_client, db_name, dataset_name, unique, topic_name ) inserters[identifier].add_pending_item() diff --git a/queueing/inserter.py b/queueing/inserter.py index ecf8d40f..0b53837e 100644 --- a/queueing/inserter.py +++ b/queueing/inserter.py @@ -6,10 +6,13 @@ from estela_queue_adapter import get_producer_interface +logger = logging.getLogger("consumer.inserter") + BATCH_SIZE_THRESHOLD = int(os.getenv("BATCH_SIZE_THRESHOLD", "4096")) INSERT_TIME_THRESHOLD = int(os.getenv("INSERT_TIME_THRESHOLD", "5")) ACTIVITY_TIME_THRESHOLD = int(os.getenv("ACTIVITY_TIME_THRESHOLD", "600")) +MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3")) # In order to avoid infinite loops producer = get_producer_interface() producer.get_connection() @@ -31,40 +34,44 @@ def __init__(self, client, database_name, collection_name, unique, topic): self.__last_insertion = time.time() self.__pending_items_count = 0 - logging.info("New Inserter created for {}.".format(self.identifier)) + logger.info("New Inserter created for {}.".format(self.identifier)) - def is_job_stats(self, collection_name): - return "job_stats" == collection_name.split("-")[2] + def is_job_stats(self): + return "job_stats" == self.topic def __handle_insertion_error(self, response, items): - logging.warning( + logger.warning( "The exception [{}] occurred during the insertion of {} items in {}.".format( response.error, len(items), self.identifier ) ) for item in items: + if item.get("retries", 0) > MAX_RETRIES: + logger.error("Item: %s has reached maximum retries.", item) + continue if item["payload"].get("_id"): del item["payload"]["_id"] if response.need_upsert: item["need_upsert"] = "True" + item["retries"] = item.get("retries", 0) + 1 producer.send(self.topic, item) def __insert_items(self, reason): - if self.is_job_stats(self.collection_name): + if self.is_job_stats(): self.__items[0]["payload"]["_id"] = self.collection_name - response = self.__client.insert_one_to_collection( + response = self.__client.insert_one_to_dataset( self.database_name, "job_stats", self.__items[0]["payload"], ) else: - response = self.__client.insert_many_to_collection( + response = self.__client.insert_many_to_dataset( self.database_name, self.collection_name, [item["payload"] for item in self.__items], ) if response.ok: - logging.info( + logger.info( "{} documents inserted [{}] in {}.".format( len(self.__items), reason, self.identifier ) @@ -86,11 +93,11 @@ def has_pending_items(self): def insert(self, item): if self.unique or item.get("need_upsert"): - response = self.__client.insert_one_to_unique_collection( + response = self.__client.insert_one_to_unique_dataset( self.database_name, self.collection_name, item["payload"] ) if response.ok: - logging.debug("1 document inserted in {}.".format(self.identifier)) + logger.debug("1 document inserted in {}. with values {}".format(self.identifier, item["payload"])) else: self.__handle_insertion_error(response, [item]) else: diff --git a/queueing/requirements/consumer.in b/queueing/requirements/consumer.in index 059598a2..200ee22e 100644 --- a/queueing/requirements/consumer.in +++ b/queueing/requirements/consumer.in @@ -1,4 +1,5 @@ pip-tools pymongo[srv] +pyodbc black git+https://github.com/bitmakerla/estela-queue-adapter.git diff --git a/queueing/requirements/consumer.txt b/queueing/requirements/consumer.txt index f76b00db..63f562a8 100644 --- a/queueing/requirements/consumer.txt +++ b/queueing/requirements/consumer.txt @@ -1,13 +1,13 @@ # -# This file is autogenerated by pip-compile with python 3.9 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: # -# pip-compile requirements/consumer.in +# pip-compile consumer.in # appdirs==1.4.4 # via black black==21.7b0 - # via -r requirements/consumer.in + # via -r consumer.in click==8.0.0 # via # black @@ -15,7 +15,7 @@ click==8.0.0 dnspython==1.16.0 # via pymongo estela-queue-adapter @ git+https://github.com/bitmakerla/estela-queue-adapter.git - # via -r requirements/consumer.in + # via -r consumer.in kafka-python==2.0.2 # via estela-queue-adapter mypy-extensions==0.4.3 @@ -25,9 +25,11 @@ pathspec==0.9.0 pep517==0.10.0 # via pip-tools pip-tools==6.1.0 - # via -r requirements/consumer.in + # via -r consumer.in pymongo[srv]==3.11.4 - # via -r requirements/consumer.in + # via -r consumer.in +pyodbc==5.0.1 + # via -r consumer.in regex==2021.8.3 # via black toml==0.10.2