diff --git a/cads_processing_api_service/clients.py b/cads_processing_api_service/clients.py index 76b82a6..086103c 100644 --- a/cads_processing_api_service/clients.py +++ b/cads_processing_api_service/clients.py @@ -108,7 +108,6 @@ def get_processes( back : bool | None, optional Specifies in which sense the list of processes should be traversed, used for pagination. """ - structlog.contextvars.bind_contextvars(client_endpoint="get_processes") logger.info("get_processes") statement = sqlalchemy.select(self.process_table) sort_key, sort_dir = utils.parse_sortby(sortby.name) @@ -161,10 +160,7 @@ def get_process( ogc_api_processes_fastapi.models.ProcessDescription Process description. """ - structlog.contextvars.bind_contextvars( - client_endpoint="get_process", process_id=process_id - ) - logger.info("get_process") + logger.info("get_process", process_id=process_id) catalogue_sessionmaker = db_utils.get_catalogue_sessionmaker( db_utils.ConnectionMode.read ) @@ -219,13 +215,12 @@ def post_process_execution( models.StatusInfo Submitted job's status information. """ - structlog.contextvars.bind_contextvars( + logger.info( + "post_process_execution", **auth_info.model_dump(), - client_endpoint="post_process_execution", process_id=process_id, execution_content=execution_content.model_dump(), ) - logger.info("post_process_execution") _ = limits.check_rate_limits( SETTINGS.rate_limits.process_execution.post, auth_info, @@ -288,7 +283,6 @@ def post_process_execution( ) ) job_id = str(uuid.uuid4()) - structlog.contextvars.bind_contextvars(job_id=job_id) job_kwargs = adaptors.make_system_job_kwargs( dataset, request_inputs, adaptor.resources ) @@ -381,11 +375,7 @@ def get_jobs( models.JobList List of jobs status information. """ - structlog.contextvars.bind_contextvars( - **auth_info.model_dump(), - client_endpoint="get_jobs", - ) - logger.info("get_jobs") + logger.info("get_jobs", **auth_info.model_dump()) _ = limits.check_rate_limits( SETTINGS.rate_limits.jobs.get, auth_info, @@ -505,12 +495,7 @@ def get_job( models.StatusInfo Job status information. """ - structlog.contextvars.bind_contextvars( - **auth_info.model_dump(), - job_id=job_id, - client_endpoint="get_job", - ) - logger.info("get_job") + logger.info("get_job", job_id=job_id, **auth_info.model_dump()) _ = limits.check_rate_limits( SETTINGS.rate_limits.job.get, auth_info, @@ -526,9 +511,6 @@ def get_job( else db_utils.ConnectionMode.read ) try: - structlog.contextvars.bind_contextvars( - compute_connection_mode=compute_connection_mode - ) compute_sessionmaker = db_utils.get_compute_sessionmaker( mode=compute_connection_mode ) @@ -554,12 +536,8 @@ def get_job( if compute_connection_mode == db_utils.ConnectionMode.write: raise else: - compute_connection_mode = db_utils.ConnectionMode.write - structlog.contextvars.bind_contextvars( - compute_connection_mode=compute_connection_mode - ) compute_sessionmaker = db_utils.get_compute_sessionmaker( - mode=compute_connection_mode + mode=db_utils.ConnectionMode.write ) with compute_sessionmaker() as compute_session: job = utils.get_job_from_broker_db( @@ -637,10 +615,7 @@ def get_job_results( ogc_api_processes_fastapi.models.Results Job results. """ - structlog.contextvars.bind_contextvars( - **auth_info.model_dump(), job_id=job_id, client_endpoint="get_job_results" - ) - logger.info("get_job_results") + logger.info("get_job_results", job_id=job_id, **auth_info.model_dump()) _ = limits.check_rate_limits( SETTINGS.rate_limits.job_results.get, auth_info, @@ -651,9 +626,6 @@ def get_job_results( else db_utils.ConnectionMode.read ) try: - structlog.contextvars.bind_contextvars( - compute_connection_mode=compute_connection_mode - ) compute_sessionmaker = db_utils.get_compute_sessionmaker( mode=compute_connection_mode ) @@ -670,10 +642,6 @@ def get_job_results( if compute_connection_mode == db_utils.ConnectionMode.write: raise else: - compute_connection_mode = db_utils.ConnectionMode.write - structlog.contextvars.bind_contextvars( - compute_connection_mode=compute_connection_mode - ) compute_sessionmaker = db_utils.get_compute_sessionmaker( mode=db_utils.ConnectionMode.write ) @@ -709,10 +677,7 @@ def delete_job( ogc_api_processes_fastapi.models.StatusInfo Job status information """ - structlog.contextvars.bind_contextvars( - **auth_info.model_dump(), job_id=job_id, operation="delete_job" - ) - logger.info("delete_job") + logger.info("delete_job", job_id=job_id, **auth_info.model_dump()) _ = limits.check_rate_limits( SETTINGS.rate_limits.job.delete, auth_info, diff --git a/cads_processing_api_service/main.py b/cads_processing_api_service/main.py index acb18a7..48500f9 100644 --- a/cads_processing_api_service/main.py +++ b/cads_processing_api_service/main.py @@ -50,7 +50,18 @@ def add_user_request_flag( @asynccontextmanager async def lifespan(application: fastapi.FastAPI) -> AsyncGenerator[Any, None]: - cads_common.logging.structlog_configure([add_user_request_flag]) + cads_common.logging.structlog_configure( + [ + add_user_request_flag, + structlog.processors.CallsiteParameterAdder( + [ + structlog.processors.CallsiteParameter.FILENAME, + structlog.processors.CallsiteParameter.FUNC_NAME, + structlog.processors.CallsiteParameter.LINENO, + ], + ), + ] + ) cads_common.logging.logging_configure() yield @@ -93,7 +104,10 @@ async def initialize_logger( ) -> Any: structlog.contextvars.clear_contextvars() trace_id = str(uuid.uuid4()) - structlog.contextvars.bind_contextvars(trace_id=trace_id, request=request.url.path) + user_ip = request.headers.get("X-Real-IP", None) + structlog.contextvars.bind_contextvars( + trace_id=trace_id, request=request.url.path, user_ip=user_ip + ) response = await call_next(request) return response diff --git a/cads_processing_api_service/utils.py b/cads_processing_api_service/utils.py index e7ca49f..d55777a 100644 --- a/cads_processing_api_service/utils.py +++ b/cads_processing_api_service/utils.py @@ -463,9 +463,8 @@ def get_job_from_broker_db( """ try: job = cads_broker.database.get_request(request_uid=job_id, session=session) - structlog.contextvars.bind_contextvars(job_status=job.status) if job.status in ("dismissed", "deleted"): - logger.error("job status is dismissed or deleted") + logger.error("job status is dismissed or deleted", job_status=job.status) raise ogc_api_processes_fastapi.exceptions.NoSuchJob( detail=f"job {job_id} {job.status}" )