Skip to content

Commit 0cb345d

Browse files
authored
Added a guide & sample for a custom logger client implementation. (opensearch-project#579)
* Added a guide & sample for a custom logger client implementation. Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> Black formatter Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> * Changes from PR review Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> Fixed import formatting in sample code for gudie. Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> Fixed nox formatting of log collection sample module. Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> Added types to log_collection_sample.py Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> Added type ignore to StramHandler class Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> Added formatting change Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> * Added PR review changes. Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> Fixed typo in CHANGELOG. Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> Requested changes. Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> Requested changes again. Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> Added link in USER_GUIDE.md. Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com> --------- Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com>
1 parent e92eac2 commit 0cb345d

File tree

4 files changed

+287
-0
lines changed

4 files changed

+287
-0
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
55
### Added
66
- Added pylint `line-too-long` and `invalid-name` ([#590](https://github.com/opensearch-project/opensearch-py/pull/590))
77
- Added pylint `pointless-statement` ([#611](https://github.com/opensearch-project/opensearch-py/pull/611))
8+
- Added a log collection guide ([#579](https://github.com/opensearch-project/opensearch-py/pull/579))
89
### Changed
910
### Deprecated
1011
### Removed

USER_GUIDE.md

+1
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ print(response)
158158
- [Making Raw JSON REST Requests](guides/json.md)
159159
- [Connection Classes](guides/connection_classes.md)
160160
- [Document Lifecycle](guides/document_lifecycle.md)
161+
- [Collecting Logs](guides/log_collection.md)
161162

162163
## Plugins
163164

guides/log_collection.md

+169
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
- [Log Collection Guide](#log-collection-guide)
2+
- [Import Required Modules](#import-required-modules)
3+
- [Download and Start OpenSearch](#download-and-start-opensearch)
4+
- [Setup Connection with OpenSearch](#setup-connection-with-opensearch)
5+
- [Initialize Logger](#initialize-logger)
6+
- [Custom Handler For Logs](#custom-handler-for-logs)
7+
- [Create OpenSearch Handler and Add to Logger](#create-opensearch-handler-and-add-to-logger)
8+
- [Setup Asynchronous Logging Using Queues](#setup-asynchronous-logging-using-queues)
9+
- [Clean Up](#clean-up)
10+
- [Sample Code](#sample-code)
11+
12+
13+
## Log Collection Guide
14+
In this guide, we will look at how to collect logs from your application and send them to OpenSearch.
15+
16+
## Import Required Modules
17+
Let's import the required modules:
18+
19+
```python
20+
import logging
21+
import queue
22+
from datetime import datetime
23+
from logging.handlers import QueueHandler, QueueListener
24+
from typing import Any
25+
26+
import urllib3
27+
28+
from opensearchpy import OpenSearch
29+
30+
urllib3.disable_warnings()
31+
```
32+
33+
## Download and Start OpenSearch
34+
```
35+
docker pull opensearchproject/opensearch:latest
36+
```
37+
38+
```
39+
docker run -d -p 9200:9200 -p 9600:9600 --name opensearch_opensearch_1 -e "discovery.type=single-node" opensearchproject/opensearch:latest
40+
```
41+
42+
## Setup Connection with OpenSearch
43+
44+
Create a client instance:
45+
```python
46+
opensearch_client: Any = OpenSearch(
47+
"https://admin:admin@localhost:9200",
48+
use_ssl=True,
49+
verify_certs=False,
50+
ssl_show_warn=False,
51+
http_auth=("admin", "admin"),
52+
)
53+
```
54+
55+
## Initialize Logger
56+
Initialize a logger, named "OpenSearchLogs", that emits logs to OpenSearch, and a console handler, both set to the INFO level, are initialized. The console handler is then added to the logger. For every log line processed by this setup, a corresponding OpenSearch document is created. This approach supports structured and comprehensive logging because each document can include extensive metadata within it.
57+
58+
```python
59+
# Initialize a logger named "OpenSearchLogs" for OpenSearch & set log level to INFO
60+
print("Initializing logger...")
61+
os_logger = logging.getLogger("OpenSearchLogs")
62+
os_logger.setLevel(logging.INFO)
63+
64+
# Create a console handler
65+
console_handler = logging.StreamHandler()
66+
console_handler.setLevel(logging.INFO)
67+
68+
# Add console handler to the logger
69+
os_logger.addHandler(console_handler)
70+
```
71+
72+
## Custom Handler For Logs
73+
Define a custom handler that logs to OpenSearch:
74+
75+
```python
76+
class OpenSearchHandler(logging.Handler):
77+
# Initializer / Instance attributes
78+
def __init__(self, opensearch_client):
79+
logging.Handler.__init__(self)
80+
self.opensearch_client = opensearch_client
81+
82+
# Build index name (e.g., "logs-YYYY-MM-DD")
83+
def _build_index_name(self):
84+
return f"logs-{datetime.date(datetime.now())}"
85+
86+
# Emit logs to the OpenSearch cluster
87+
def emit(self, record):
88+
document = {
89+
"timestamp": datetime.fromtimestamp(record.created).isoformat(),
90+
"name": record.name,
91+
"level": record.levelname,
92+
"message": record.getMessage(),
93+
"source": {
94+
"file": record.pathname,
95+
"line": record.lineno,
96+
"function": record.funcName,
97+
},
98+
"process": {
99+
"id": record.process,
100+
"name": record.processName
101+
},
102+
"thread": {
103+
"id": record.thread,
104+
"name": record.threadName
105+
},
106+
}
107+
108+
# Write the log entry to OpenSearch, handle exceptions
109+
self.opensearch_client.index(
110+
index=self._build_index_name(),
111+
body=document,
112+
)
113+
```
114+
115+
## Create OpenSearch Handler and Add to Logger
116+
Create an instance of OpenSearchHandler and add it to the logger:
117+
118+
```python
119+
print("Creating an instance of OpenSearchHandler and adding it to the logger...")
120+
# Create an instance of OpenSearchHandler and add it to the logger
121+
os_handler = OpenSearchHandler(opensearch_client)
122+
os_logger.addHandler(os_handler)
123+
```
124+
125+
## Setup Asynchronous Logging Using Queues
126+
Finally, let's setup asynchronous logging using Queues:
127+
128+
```python
129+
print("Setting up asynchronous logging using Queues...")
130+
# Setup asynchronous logging using Queues
131+
log_queue = queue.Queue(-1) # no limit on size
132+
os_queue_handler = QueueHandler(log_queue)
133+
os_queue_listener = QueueListener(log_queue, os_handler)
134+
135+
# Add queue handler to the logger
136+
os_logger.addHandler(os_queue_handler)
137+
138+
# Start listening on the queue using the os_queue_listener
139+
os_queue_listener.start()
140+
```
141+
142+
## Clean Up
143+
Finally, let's clean up by stopping the queue listener:
144+
145+
```python
146+
print("Cleaning up...")
147+
# Stop listening on the queue
148+
os_queue_listener.stop()
149+
print("Log Collection Guide has completed running")
150+
```
151+
152+
## Sample Code
153+
See [log_collection_sample.py](/samples/logging/log_collection_sample.py) for a working sample of the concepts in this guide. The script will create a logger named "OpenSearchLogs" and set the log level to INFO. It will then create an instance of OpenSearchHandler and add it to the logger. Finally, it will setup asynchronous logging using Queues and send a test log to the OpenSearch cluster.
154+
155+
Exptected Output From Running [log_collection_sample.py](/samples/logging/log_collection_sample.py):
156+
157+
```
158+
"""
159+
Running Log Collection Guide
160+
Setting up connection with OpenSearch cluster...
161+
Initializing logger...
162+
Creating an instance of OpenSearchHandler and adding it to the logger...
163+
Setting up asynchronous logging using Queues...
164+
Logger is set up and listener has started. Sending a test log...
165+
This is a test log message
166+
Cleaning up...
167+
Log Collection Guide has completed running
168+
"""
169+
```
+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#!/usr/bin/env python
2+
3+
# -*- coding: utf-8 -*-
4+
# SPDX-License-Identifier: Apache-2.0
5+
#
6+
# The OpenSearch Contributors require contributions made to
7+
# this file be licensed under the Apache-2.0 license or a
8+
# compatible open source license.
9+
#
10+
# Modifications Copyright OpenSearch Contributors. See
11+
# GitHub history for details.
12+
13+
import logging
14+
import queue
15+
from datetime import datetime
16+
from logging.handlers import QueueHandler, QueueListener
17+
from typing import Any
18+
19+
import urllib3
20+
21+
from opensearchpy import OpenSearch
22+
23+
urllib3.disable_warnings()
24+
25+
26+
def main() -> None:
27+
print("Collecting logs.")
28+
29+
# Create a console handler
30+
console_handler: logging.StreamHandler = logging.StreamHandler() # type: ignore
31+
console_handler.setLevel(logging.INFO)
32+
33+
# Setup connection with the OpenSearch cluster
34+
print("Setting up connection with OpenSearch cluster...")
35+
opensearch_client: Any = OpenSearch(
36+
"https://admin:admin@localhost:9200",
37+
use_ssl=True,
38+
verify_certs=False,
39+
ssl_show_warn=False,
40+
http_auth=("admin", "admin"),
41+
)
42+
43+
# Initialize a logger named "OpenSearchLogs" for OpenSearch
44+
print("Initializing logger...")
45+
os_logger: logging.Logger = logging.getLogger("OpenSearchLogs")
46+
os_logger.setLevel(logging.INFO)
47+
48+
# Add console handler to the logger
49+
os_logger.addHandler(console_handler)
50+
51+
# Define a custom handler that logs to OpenSearch
52+
class OpenSearchHandler(logging.Handler):
53+
# Initializer / Instance attributes
54+
def __init__(self, opensearch_client: Any) -> None:
55+
super().__init__()
56+
self.os_client = opensearch_client
57+
58+
# Build index name (e.g., "logs-YYYY-MM-DD")
59+
def _build_index_name(self) -> str:
60+
return f"logs-{datetime.date(datetime.now())}"
61+
62+
# Emit logs to the OpenSearch cluster
63+
def emit(self, record: logging.LogRecord) -> None:
64+
document = {
65+
"timestamp": datetime.fromtimestamp(record.created).isoformat(),
66+
"name": record.name,
67+
"level": record.levelname,
68+
"message": record.getMessage(),
69+
"source": {
70+
"file": record.pathname,
71+
"line": record.lineno,
72+
"function": record.funcName,
73+
},
74+
"process": {"id": record.process, "name": record.processName},
75+
"thread": {"id": record.thread, "name": record.threadName},
76+
}
77+
78+
try:
79+
self.os_client.index(
80+
index=self._build_index_name(),
81+
body=document,
82+
)
83+
except Exception as e:
84+
print(f"Failed to send log to OpenSearch: {e}")
85+
logging.warning(f"Failed to send log to OpenSearch: {e}")
86+
raise
87+
88+
print("Creating an instance of OpenSearchHandler and adding it to the logger...")
89+
# Create an instance of OpenSearchHandler and add it to the logger
90+
os_handler: OpenSearchHandler = OpenSearchHandler(opensearch_client)
91+
os_logger.addHandler(os_handler)
92+
93+
print("Setting up asynchronous logging using Queues...")
94+
# Setup asynchronous logging using Queues
95+
log_queue: queue.Queue[logging.LogRecord] = queue.Queue(-1) # no limit on size
96+
os_queue_handler: logging.Handler = QueueHandler(log_queue)
97+
os_queue_listener: QueueListener = QueueListener(log_queue, os_handler)
98+
99+
# Add queue handler to the logger
100+
os_logger.addHandler(os_queue_handler)
101+
102+
# Start listening on the queue using the os_queue_listener
103+
os_queue_listener.start()
104+
105+
print("Logger is set up and listener has started. Sending a test log...")
106+
# Logging a test message
107+
os_logger.info("This is a test log message")
108+
109+
print("Cleaning up...")
110+
# Stop listening on the queue
111+
os_queue_listener.stop()
112+
print("Log Collection Guide has completed running")
113+
114+
115+
if __name__ == "__main__":
116+
main()

0 commit comments

Comments
 (0)