Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add consumer and topic names to the log. #52

Merged
merged 3 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.5.13"
current_version = "0.5.14"
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)"
serialize = ["{major}.{minor}.{patch}"]
search = "{current_version}"
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.5.14 (2025-01-27)
* Improve logging. Add consumer and topic names to the error message.

## 0.5.13 (2025-01-07)
* Upgrade `confluent-kafka` from `v2.6.2` to `v2.7.0` as recommended by authors.

Expand Down
2 changes: 1 addition & 1 deletion django_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

logger = logging.getLogger(__name__)

__version__ = "0.5.13"
__version__ = "0.5.14"

__all__ = [
"autodiscover",
Expand Down
26 changes: 19 additions & 7 deletions django_kafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def blocking_retry(
if retry_settings.can_retry(attempt, exc):
until = retry_settings.get_retry_time(attempt)
self.pause_partition(msg, until)
self.log_error(exc)
self.log_error(msg, exc_info=exc)
return True
return False

Expand Down Expand Up @@ -153,22 +153,34 @@ def handle_exception(self, msg: "cimpl.Message", exc: Exception) -> bool:
retried, blocking = self.retry_msg(msg, exc)
if not retried:
self.dead_letter_msg(msg, exc)
self.log_error(exc)
self.log_error(msg, exc_info=exc)
return True
return not blocking

def get_topic(self, msg: "cimpl.Message") -> "TopicConsumer":
return self.topics.get(topic_name=msg.topic())

def log_error(self, error):
logger.error(error, exc_info=True)
def log_error(
self,
msg: Optional["cimpl.Message"] = None,
exc_info: bool | Exception = False,
):
error = f"'{self.__class__.__module__}.{self.__class__.__name__} failed'"
if msg:
topic = self.get_topic(msg)
error = f"{error} on '{topic.__class__.__module__}.{topic.__class__.__name__}'"

if msg_error := msg.error():
error = f"{error}\nMessage error: '{msg_error}'"

logger.error(error, exc_info=exc_info)

def consume(self, msg):
self.get_topic(msg).consume(msg)

def process_message(self, msg: "cimpl.Message"):
if msg_error := msg.error():
self.log_error(msg_error)
if msg.error():
self.log_error(msg)
return

try:
Expand Down Expand Up @@ -197,7 +209,7 @@ def run(self):
if (msg := self.poll()) is not None:
self.process_message(msg)
except Exception as exc:
self.log_error(exc)
self.log_error(exc_info=exc)
raise
finally:
self.stop()
Expand Down
6 changes: 3 additions & 3 deletions django_kafka/tests/test_consumer/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class SomeConsumer(Consumer):
# checks msg had error before processing
msg.error.assert_called_once_with()
# error handler was triggered
log_error.assert_called_once_with(msg.error.return_value)
log_error.assert_called_once_with(msg)
# Topic.consume is not called
consumer.topics[msg.topic()].consume.assert_not_called()
# Consumer.commit_offset is not called
Expand Down Expand Up @@ -269,7 +269,7 @@ class SomeConsumer(Consumer):

consumer.retry_msg.assert_called_once_with(msg, exc)
consumer.dead_letter_msg.assert_called_once_with(msg, exc)
consumer.log_error.assert_called_once_with(exc)
consumer.log_error.assert_called_once_with(msg, exc_info=exc)

def test_blocking_retry(self):
retry_time = timezone.now()
Expand All @@ -289,7 +289,7 @@ class SomeConsumer(Consumer):
retried = consumer.blocking_retry(retry_settings, msg_mock, exc)

consumer.pause_partition.assert_called_once_with(msg_mock, retry_time)
consumer.log_error.assert_called_once_with(exc)
consumer.log_error.assert_called_once_with(msg_mock, exc_info=exc)
self.assertEqual(retried, True)

@patch("django_kafka.consumer.Consumer.log_error")
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "django-kafka"
version = "0.5.13"
version = "0.5.14"
dependencies = [
"django>=4.2,<6.0",
"confluent-kafka[avro, schemaregistry]==2.7.0"
Expand Down
Loading