From 34965911d60215de5aad925394525c66cf937ae1 Mon Sep 17 00:00:00 2001 From: Alexey Date: Tue, 23 Jul 2024 19:11:15 +0700 Subject: [PATCH 1/3] Forecast with mindsdb initial --- .../src/main/resources/forecast_training.bpmn | 242 ++++++++++++++++++ external_workers/ml_workers/__init__.py | 0 .../ml_workers/forecast_worker.py | 195 ++++++++++++++ 3 files changed, 437 insertions(+) create mode 100644 engine/src/main/resources/forecast_training.bpmn create mode 100644 external_workers/ml_workers/__init__.py create mode 100644 external_workers/ml_workers/forecast_worker.py diff --git a/engine/src/main/resources/forecast_training.bpmn b/engine/src/main/resources/forecast_training.bpmn new file mode 100644 index 00000000..7dba7d97 --- /dev/null +++ b/engine/src/main/resources/forecast_training.bpmn @@ -0,0 +1,242 @@ + + + + + + Flow_0f2nvj6 + + + + + + + + + + + + + + + + + Flow_0f2nvj6 + Flow_14q3psa + Flow_1flrme2 + + + Flow_1flrme2 + Flow_0ska0b9 + + + + Flow_0ska0b9 + Flow_0xtrb9y + Flow_1dzh1ys + Flow_1b9wrwd + + + Flow_1b9wrwd + Flow_139ovdl + Flow_14q3psa + Flow_1dozy0t + + + + + Flow_139ovdl + Flow_0xtrb9y + + PT15S + + + + + ${status == 'error'} + + + ${status == 'complete'} + + + Flow_1browlk + Flow_03dmjrw + + + + Flow_03dmjrw + Flow_1l0n1pi + + + Flow_0c975pl + Flow_1l0n1pi + Flow_1lw4113 + Flow_0c8lxk7 + + + Flow_1lw4113 + Flow_1dzh1ys + + + + + Flow_1dozy0t + Flow_1browlk + Flow_0c975pl + + + ${is_creating} + + + + + Flow_1c0dky7 + + + ${delete_model} + + + + + Flow_0c8lxk7 + Flow_1c0dky7 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/external_workers/ml_workers/__init__.py b/external_workers/ml_workers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/external_workers/ml_workers/forecast_worker.py b/external_workers/ml_workers/forecast_worker.py new file mode 100644 index 00000000..da50aff4 --- /dev/null +++ b/external_workers/ml_workers/forecast_worker.py @@ -0,0 +1,195 @@ +import os + +from camunda.external_task.external_task import ExternalTask, TaskResult +from camunda.external_task.external_task_worker import ExternalTaskWorker +import requests +import json + +MINDSDB_USER = os.getenv("MINDSDB_USER", "mindsdb") +MINDSDB_PASS = os.getenv("MINDSDB_PASS", "mindsdb") +MINDSDB_API_URL = 'http://0.0.0.0:47334' +MINDSDB_PROJECT_NAME = 'mindsdb' + +default_config = { + "auth_basic": {"username": "demo", "password": "demo"}, + "maxTasks": 1, + "lockDuration": 10000, + "asyncResponseTimeout": 5000, + "retries": 3, + "retryTimeout": 15000, + "sleepSeconds": 30 +} + +# Define the topics to be subscribed to +TOPICS = [ + "ml_create_model", + "ml_check_model_status", + "ml_create_view", + "ml_create_wh_query", + "ml_retrain_model", + "ml_drop_model" +] + + +def model_exists(project_name, model_name): + try: + response = requests.get( + f"{MINDSDB_API_URL}/api/projects/{project_name}/models/{model_name}", + auth=(MINDSDB_USER, MINDSDB_PASS) + ) + response.raise_for_status() + return True + except requests.exceptions.HTTPError as http_err: + if response.status_code == 404: + return False + print(f"HTTP error occurred: {http_err}") + return False + except Exception as err: + print(f"Other error occurred: {err}") + return False + + +def delete_model(project_name, model_name): + try: + response = requests.delete( + f"{MINDSDB_API_URL}/api/projects/{project_name}/models/{model_name}", + auth=(MINDSDB_USER, MINDSDB_PASS) + ) + response.raise_for_status() + print(f"Model {project_name}.{model_name} deleted successfully.") + except requests.exceptions.RequestException as e: + print(f"Failed to delete model {project_name}.{model_name}: {e}") + + +def handle_create_model(task: ExternalTask) -> TaskResult: + variables = task.get_variables() + camunda_user_id = variables.get('camunda_user_id') + data_source = variables.get('data_source') + engine = variables.get('engine') + token_address = variables.get('token_address') + timestamp_start = variables.get('timestamp_start') + window = variables.get('window') + horizon = variables.get('horizon') + + model_name = f"{camunda_user_id}_{data_source}_{engine}" + + if model_exists(MINDSDB_PROJECT_NAME, model_name): + delete_model(MINDSDB_PROJECT_NAME, model_name) + + # Construct the SQL query for model creation + query = f""" + CREATE MODEL {MINDSDB_PROJECT_NAME}.{model_name} + FROM clickhouse_ethereum ( + SELECT + toUnixTimestamp(toStartOfHour(FROM_UNIXTIME(timestamp), 'UTC'), 'UTC') as time, + token_address as token_address, + max(c_s).2 as price + FROM {data_source} + WHERE token_address = '{token_address}' AND timestamp >= {timestamp_start} + GROUP BY time, token_address + ) + PREDICT price + ORDER BY time + GROUP BY token_address + HORIZON {horizon} + WINDOW {window} + USING ENGINE = '{engine}'; + """ + + payload = { + "query": query + } + + try: + # Make the HTTP request to create the model + response = requests.post( + f"{MINDSDB_API_URL}/api/projects/{MINDSDB_PROJECT_NAME}/models", + json=payload, + auth=(MINDSDB_USER, MINDSDB_PASS) + ) + response.raise_for_status() + print(f"Model {model_name} created successfully: {response.json()}") + variables['is_creating'] = True + return task.complete(variables) + except requests.exceptions.RequestException as e: + print(f"Failed to create model {model_name}: {e}") + return task.failure(error_message=str(e), error_details=str(e), max_retries=0, retry_timeout=0) + + +def handle_check_model_status(task: ExternalTask) -> TaskResult: + variables = task.get_variables() + camunda_user_id = variables.get('camunda_user_id') + data_source = variables.get('data_source') + engine = variables.get('engine') + token_address = variables.get('token_address') + timestamp_start = variables.get('timestamp_start') + window = variables.get('window') + horizon = variables.get('horizon') + + model_name = f"{camunda_user_id}_{data_source}_{engine}" + + try: + response = requests.get( + f"{MINDSDB_API_URL}/api/projects/{MINDSDB_PROJECT_NAME}/models/{model_name}", + auth=(MINDSDB_USER, MINDSDB_PASS) + ) + response.raise_for_status() + model_status = response.json().get('status', 'unknown') + variables['status'] = model_status + print(f"Model status for {MINDSDB_PROJECT_NAME}.{model_name}: {model_status}") + return task.complete(variables) + except requests.exceptions.RequestException as e: + print(f"Failed to get model status for {MINDSDB_PROJECT_NAME}.{model_name}: {e}") + return task.failure(error_message=str(e), error_details=str(e), retries=0) + + +def handle_create_view(task: ExternalTask) -> TaskResult: + print(f"Handling create view for task: {task.get_variables()}") + # Not implemented + # return task.complete() + + +def handle_create_wh_query(task: ExternalTask) -> TaskResult: + print(f"Handling create warehouse query for task: {task.get_variables()}") + # Not implemented + # return task.complete() + + +def handle_retrain_model(task: ExternalTask) -> TaskResult: + print(f"Handling retrain model for task: {task.get_variables()}") + # Not implemented + # return task.complete() + + +def handle_drop_model(task: ExternalTask) -> TaskResult: + print(f"Handling drop model for task: {task.get_variables()}") + # Not implemented + # return task.complete() + + +def handle_task(task: ExternalTask) -> TaskResult: + topic = task.get_topic_name() + + if topic == "ml_create_model": + return handle_create_model(task) + elif topic == "ml_check_model_status": + return handle_check_model_status(task) + elif topic == "ml_create_view": + return handle_create_view(task) + elif topic == "ml_create_wh_query": + return handle_create_wh_query(task) + elif topic == "ml_retrain_model": + return handle_retrain_model(task) + elif topic == "ml_drop_model": + return handle_drop_model(task) + else: + print(f"Unknown topic: {topic}") + return task.failure('unknown topic', 'Unknown topic', 0, 0) + + +if __name__ == '__main__': + worker = ExternalTaskWorker(worker_id="ml_workers", + base_url="http://localhost:8080/engine-rest", + config=default_config) + worker.subscribe(topic_names=TOPICS, action=handle_task) + From 17038e8a2460b1a7fff0c47451ba538dbe985d4d Mon Sep 17 00:00:00 2001 From: Alexey Date: Wed, 24 Jul 2024 23:11:19 +0700 Subject: [PATCH 2/3] add create view --- .../ml_workers/forecast_worker.py | 51 +++++++++++++++++-- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/external_workers/ml_workers/forecast_worker.py b/external_workers/ml_workers/forecast_worker.py index da50aff4..1f9b74ac 100644 --- a/external_workers/ml_workers/forecast_worker.py +++ b/external_workers/ml_workers/forecast_worker.py @@ -144,9 +144,54 @@ def handle_check_model_status(task: ExternalTask) -> TaskResult: def handle_create_view(task: ExternalTask) -> TaskResult: - print(f"Handling create view for task: {task.get_variables()}") - # Not implemented - # return task.complete() + variables = task.get_variables() + camunda_user_id = variables.get('camunda_user_id') + data_source = variables.get('data_source') + engine = variables.get('engine') + token_address = variables.get('token_address') + timestamp_start = variables.get('timestamp_start') + project_name = MINDSDB_PROJECT_NAME + network = variables.get('network', 'ethereum') + + view_name = f"{camunda_user_id}_{data_source}_{engine}_view" + + query = f""" + SELECT + * + FROM clickhouse_{network} ( + SELECT + toStartOfHour(FROM_UNIXTIME(timestamp), 'UTC') as time, + token_address as token_address, + max(c_s).2 as price + FROM {data_source} + WHERE token_address = '{token_address}' AND timestamp >= {timestamp_start} + GROUP BY time, token_address + ); + """ + + payload = { + "view": { + "name": view_name, + "query": query + } + } + + try: + print(f"Creating view with name: {view_name}") + print(f"Query: {query}") + # Make the HTTP request to create the view + response = requests.post( + f"{MINDSDB_API_URL}/api/projects/{project_name}/views", + json=payload, + auth=(MINDSDB_USER, MINDSDB_PASS) + ) + response.raise_for_status() + print(f"View {view_name} created successfully: {response.json()}") + return task.complete() + except requests.exceptions.RequestException as e: + print(f"Failed to create view {view_name}: {e}") + return task.failure(error_message=str(e), error_details=str(e), retries=0) + def handle_create_wh_query(task: ExternalTask) -> TaskResult: From f5643d1f973e8809a6684a1e1487b3b6b67784e7 Mon Sep 17 00:00:00 2001 From: Alexey Date: Fri, 16 Aug 2024 16:15:36 +0700 Subject: [PATCH 3/3] finish forecast training process workers --- .../src/main/resources/forecast_training.bpmn | 109 +++++----- .../ml_workers/forecast_worker.py | 197 +++++++++++------- 2 files changed, 180 insertions(+), 126 deletions(-) diff --git a/engine/src/main/resources/forecast_training.bpmn b/engine/src/main/resources/forecast_training.bpmn index 7dba7d97..f5440a27 100644 --- a/engine/src/main/resources/forecast_training.bpmn +++ b/engine/src/main/resources/forecast_training.bpmn @@ -18,6 +18,7 @@ + Flow_0f2nvj6 @@ -67,6 +68,12 @@ Flow_1l0n1pi + + + + + + Flow_0c975pl Flow_1l0n1pi Flow_1lw4113 @@ -103,6 +110,17 @@ + + + + + + + + + + + @@ -110,12 +128,15 @@ + + + + + + - - - @@ -124,6 +145,13 @@ + + + + + + + @@ -131,41 +159,14 @@ - - - - - - - - - - - - - - - - - - - - - + + + + - - - - - - - - - - @@ -181,6 +182,11 @@ + + + + + @@ -196,6 +202,20 @@ + + + + + + + + + + + + + + @@ -203,10 +223,6 @@ - - - - @@ -216,27 +232,18 @@ - - - - - - - - - - - - - + + + + diff --git a/external_workers/ml_workers/forecast_worker.py b/external_workers/ml_workers/forecast_worker.py index 1f9b74ac..ccc3c484 100644 --- a/external_workers/ml_workers/forecast_worker.py +++ b/external_workers/ml_workers/forecast_worker.py @@ -1,14 +1,16 @@ import os - -from camunda.external_task.external_task import ExternalTask, TaskResult -from camunda.external_task.external_task_worker import ExternalTaskWorker import requests import json +from camunda.external_task.external_task import ExternalTask, TaskResult +from camunda.external_task.external_task_worker import ExternalTaskWorker +CAMUNDA_URL = os.getenv('CAMUNDA_URL', 'http://localhost:8080/engine-rest') MINDSDB_USER = os.getenv("MINDSDB_USER", "mindsdb") MINDSDB_PASS = os.getenv("MINDSDB_PASS", "mindsdb") MINDSDB_API_URL = 'http://0.0.0.0:47334' MINDSDB_PROJECT_NAME = 'mindsdb' +WAREHOUSE_URL = os.getenv('WAREHOUSE_URL', 'http://localhost:5001') +WAREHOUSE_API_KEY = os.getenv('WAREHOUSE_API_KEY', 'SUAK8G8g28PgoTcNF6maTpmr59Th9Ssi5zvfojtj') default_config = { "auth_basic": {"username": "demo", "password": "demo"}, @@ -30,6 +32,11 @@ "ml_drop_model" ] +def get_model_name(camunda_user_id, data_source, engine): + return f"{camunda_user_id}_{data_source}_{engine}" + +def get_view_name(camunda_user_id, data_source, engine): + return f"{camunda_user_id}_{data_source}_{engine}_view" def model_exists(project_name, model_name): try: @@ -48,22 +55,17 @@ def model_exists(project_name, model_name): print(f"Other error occurred: {err}") return False - -def delete_model(project_name, model_name): +def delete_resource(url): try: - response = requests.delete( - f"{MINDSDB_API_URL}/api/projects/{project_name}/models/{model_name}", - auth=(MINDSDB_USER, MINDSDB_PASS) - ) + response = requests.delete(url, auth=(MINDSDB_USER, MINDSDB_PASS)) response.raise_for_status() - print(f"Model {project_name}.{model_name} deleted successfully.") + print(f"Resource at {url} deleted successfully.") except requests.exceptions.RequestException as e: - print(f"Failed to delete model {project_name}.{model_name}: {e}") + print(f"Failed to delete resource at {url}: {e}") - -def handle_create_model(task: ExternalTask) -> TaskResult: - variables = task.get_variables() +def create_model(variables): camunda_user_id = variables.get('camunda_user_id') + network = variables.get('network') data_source = variables.get('data_source') engine = variables.get('engine') token_address = variables.get('token_address') @@ -71,15 +73,14 @@ def handle_create_model(task: ExternalTask) -> TaskResult: window = variables.get('window') horizon = variables.get('horizon') - model_name = f"{camunda_user_id}_{data_source}_{engine}" + model_name = get_model_name(camunda_user_id, data_source, engine) if model_exists(MINDSDB_PROJECT_NAME, model_name): - delete_model(MINDSDB_PROJECT_NAME, model_name) + delete_resource(f"{MINDSDB_API_URL}/api/projects/{MINDSDB_PROJECT_NAME}/models/{model_name}") - # Construct the SQL query for model creation query = f""" CREATE MODEL {MINDSDB_PROJECT_NAME}.{model_name} - FROM clickhouse_ethereum ( + FROM clickhouse_{network} ( SELECT toUnixTimestamp(toStartOfHour(FROM_UNIXTIME(timestamp), 'UTC'), 'UTC') as time, token_address as token_address, @@ -96,12 +97,9 @@ def handle_create_model(task: ExternalTask) -> TaskResult: USING ENGINE = '{engine}'; """ - payload = { - "query": query - } + payload = {"query": query} try: - # Make the HTTP request to create the model response = requests.post( f"{MINDSDB_API_URL}/api/projects/{MINDSDB_PROJECT_NAME}/models", json=payload, @@ -110,23 +108,13 @@ def handle_create_model(task: ExternalTask) -> TaskResult: response.raise_for_status() print(f"Model {model_name} created successfully: {response.json()}") variables['is_creating'] = True - return task.complete(variables) + return TaskResult.complete(variables) except requests.exceptions.RequestException as e: print(f"Failed to create model {model_name}: {e}") - return task.failure(error_message=str(e), error_details=str(e), max_retries=0, retry_timeout=0) - - -def handle_check_model_status(task: ExternalTask) -> TaskResult: - variables = task.get_variables() - camunda_user_id = variables.get('camunda_user_id') - data_source = variables.get('data_source') - engine = variables.get('engine') - token_address = variables.get('token_address') - timestamp_start = variables.get('timestamp_start') - window = variables.get('window') - horizon = variables.get('horizon') + return TaskResult.failure(error_message=str(e), error_details=str(e), max_retries=0, retry_timeout=0) - model_name = f"{camunda_user_id}_{data_source}_{engine}" +def check_model_status(variables): + model_name = get_model_name(variables['camunda_user_id'], variables['data_source'], variables['engine']) try: response = requests.get( @@ -137,14 +125,12 @@ def handle_check_model_status(task: ExternalTask) -> TaskResult: model_status = response.json().get('status', 'unknown') variables['status'] = model_status print(f"Model status for {MINDSDB_PROJECT_NAME}.{model_name}: {model_status}") - return task.complete(variables) + return TaskResult.complete(variables) except requests.exceptions.RequestException as e: print(f"Failed to get model status for {MINDSDB_PROJECT_NAME}.{model_name}: {e}") - return task.failure(error_message=str(e), error_details=str(e), retries=0) - + return TaskResult.failure(error_message=str(e), error_details=str(e), retries=0) -def handle_create_view(task: ExternalTask) -> TaskResult: - variables = task.get_variables() +def create_view(variables): camunda_user_id = variables.get('camunda_user_id') data_source = variables.get('data_source') engine = variables.get('engine') @@ -153,7 +139,7 @@ def handle_create_view(task: ExternalTask) -> TaskResult: project_name = MINDSDB_PROJECT_NAME network = variables.get('network', 'ethereum') - view_name = f"{camunda_user_id}_{data_source}_{engine}_view" + view_name = get_view_name(camunda_user_id, data_source, engine) query = f""" SELECT @@ -176,10 +162,10 @@ def handle_create_view(task: ExternalTask) -> TaskResult: } } + if model_exists(project_name, view_name): + delete_resource(f"{MINDSDB_API_URL}/api/projects/{project_name}/views/{view_name}") + try: - print(f"Creating view with name: {view_name}") - print(f"Query: {query}") - # Make the HTTP request to create the view response = requests.post( f"{MINDSDB_API_URL}/api/projects/{project_name}/views", json=payload, @@ -187,54 +173,115 @@ def handle_create_view(task: ExternalTask) -> TaskResult: ) response.raise_for_status() print(f"View {view_name} created successfully: {response.json()}") - return task.complete() + return TaskResult.complete() except requests.exceptions.RequestException as e: print(f"Failed to create view {view_name}: {e}") - return task.failure(error_message=str(e), error_details=str(e), retries=0) + return TaskResult.failure(error_message=str(e), error_details=str(e), retries=0) +def create_wh_query(variables): + print(f"Handling create warehouse query for task: {variables}") + camunda_user_id = variables.get('camunda_user_id') + data_source = variables.get('data_source') + engine = variables.get('engine') + token_address = variables.get('token_address') + + url = f'{WAREHOUSE_URL}/api/data_sources' + data_sources = requests.get( + url=url, + headers={ + 'Content-Type': 'application/json', + 'Authorization': WAREHOUSE_API_KEY + } + ).json() + try: + mindsdb_ds = [i for i in data_sources if i['type'] == 'mindsdb'][-1] + except IndexError as e: + print(f"Failed to get datasource mindsdb: {e}") + return TaskResult.failure(error_message=str(e), error_details=str(e), retries=0) + + try: + model_name = get_model_name(camunda_user_id, data_source, engine) + view_name = get_view_name(camunda_user_id, data_source, engine) + query = f"""SELECT + * + FROM + mindsdb.{view_name} AS d + JOIN + mindsdb.{model_name} AS m + ON d.time = m.time + WHERE + d.token_address = '{token_address}'""" + + data = { + "name": model_name, + "query": query, + "data_source_id": mindsdb_ds['id'] + } + + url = f'{WAREHOUSE_URL}/api/queries' + response = requests.post( + url=url, + data=json.dumps(data), + headers={ + 'Content-Type': 'application/json', + 'Authorization': WAREHOUSE_API_KEY + } + ) + response.raise_for_status() -def handle_create_wh_query(task: ExternalTask) -> TaskResult: - print(f"Handling create warehouse query for task: {task.get_variables()}") - # Not implemented - # return task.complete() + variables['curl'] = f"""curl -X POST + 'https://api.dev.dex.guru/wh/{model_name}?api_key={WAREHOUSE_API_KEY}' + -H 'Content-Type: application/json'""" + variables['curl'] = variables['curl'] + "--data '{\"parameters\": {}}'" + return TaskResult.complete(variables) + except requests.exceptions.RequestException as e: + print(f"Failed to create warehouse query for {view_name}: {e}") + return TaskResult.failure(error_message=str(e), error_details=str(e), retries=0) +def retrain_model(variables): + return create_model(variables) # Simply call create_model as retrain logic is the same -def handle_retrain_model(task: ExternalTask) -> TaskResult: - print(f"Handling retrain model for task: {task.get_variables()}") - # Not implemented - # return task.complete() +def drop_model(variables): + camunda_user_id = variables.get('camunda_user_id') + data_source = variables.get('data_source') + engine = variables.get('engine') + project_name = MINDSDB_PROJECT_NAME + model_name = get_model_name(camunda_user_id, data_source, engine) + view_name = get_view_name(camunda_user_id, data_source, engine) -def handle_drop_model(task: ExternalTask) -> TaskResult: - print(f"Handling drop model for task: {task.get_variables()}") - # Not implemented - # return task.complete() + model_url = f"{MINDSDB_API_URL}/api/projects/{project_name}/models/{model_name}" + view_url = f"{MINDSDB_API_URL}/api/projects/{project_name}/views/{view_name}" + try: + delete_resource(view_url) + delete_resource(model_url) + return TaskResult.complete(variables) + except requests.exceptions.RequestException as e: + print(f"Failed to drop model or view: {e}") + return TaskResult.failure(error_message=str(e), error_details=str(e), retries=0) def handle_task(task: ExternalTask) -> TaskResult: topic = task.get_topic_name() + handlers = { + "ml_create_model": create_model, + "ml_check_model_status": check_model_status, + "ml_create_view": create_view, + "ml_create_wh_query": create_wh_query, + "ml_retrain_model": retrain_model, + "ml_drop_model": drop_model + } - if topic == "ml_create_model": - return handle_create_model(task) - elif topic == "ml_check_model_status": - return handle_check_model_status(task) - elif topic == "ml_create_view": - return handle_create_view(task) - elif topic == "ml_create_wh_query": - return handle_create_wh_query(task) - elif topic == "ml_retrain_model": - return handle_retrain_model(task) - elif topic == "ml_drop_model": - return handle_drop_model(task) + handler = handlers.get(topic) + if handler: + return handler(task.get_variables()) else: print(f"Unknown topic: {topic}") - return task.failure('unknown topic', 'Unknown topic', 0, 0) - + return TaskResult.failure('unknown topic', 'Unknown topic', 0, 0) if __name__ == '__main__': worker = ExternalTaskWorker(worker_id="ml_workers", - base_url="http://localhost:8080/engine-rest", + base_url=CAMUNDA_URL, config=default_config) worker.subscribe(topic_names=TOPICS, action=handle_task) -