Skip to content

Commit

Permalink
Merge pull request #54 from RegioHelden/53-suppress-message-serializa…
Browse files Browse the repository at this point in the history
…tion

fix: Producer.suppress should also suppress message serialization, refs #53
  • Loading branch information
stefan-cardnell-rh authored Feb 6, 2025
2 parents cf972f2 + 8055919 commit 9d86b65
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.5.14"
current_version = "0.5.15"
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)"
serialize = ["{major}.{minor}.{patch}"]
search = "{current_version}"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ Used by `django_kafka.connect.connector.Connector` to initialize `django_kafka.c
### `producer.suppress`
Use the `producer.suppress` decorator and context manager to suppress the producing of messages generated by the `Producer` class during a particular context.
Use the `producer.suppress` function decorator and context manager to suppress the producing of messages generated by the `Producer` class during a particular context.
```python
from django_kafka import producer
Expand Down
2 changes: 1 addition & 1 deletion django_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

logger = logging.getLogger(__name__)

__version__ = "0.5.14"
__version__ = "0.5.15"

__all__ = [
"autodiscover",
Expand Down
13 changes: 13 additions & 0 deletions django_kafka/tests/test_topic/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from confluent_kafka.serialization import MessageField
from django.test import TestCase

from django_kafka import producer
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.topic import TopicConsumer, TopicProducer

Expand Down Expand Up @@ -157,6 +158,18 @@ def test_serialize_unknown_field(
mock_value_serializer.assert_not_called()
mock_key_serializer.assert_not_called()

@patch("django_kafka.topic.TopicProducer.serialize")
@patch("django_kafka.kafka.producer")
def test_produce_suppression(self, mock_kafka_producer, mock_topic_serialize):
key = "key"
value = "message value"

with producer.suppress():
self.topic_producer.produce(value, key=key)

mock_topic_serialize.assert_not_called()
mock_kafka_producer.produce.assert_not_called()

def test_context(self):
fields = [MessageField.VALUE, MessageField.KEY]
headers = {"header-1": "header-1-value"}
Expand Down
4 changes: 4 additions & 0 deletions django_kafka/topic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from django_kafka import kafka
from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.producer import Suppression

if TYPE_CHECKING:
from confluent_kafka import cimpl
Expand Down Expand Up @@ -67,6 +68,9 @@ def serialize(
raise DjangoKafkaError(f"Unsupported serialization field {field}.")

def produce(self, value: any, **kwargs):
if Suppression.active(self.name):
return # do not serialize if producing is suppressed

key_serializer_kwargs = kwargs.pop("key_serializer_kwargs", {}) or {}
value_serializer_kwargs = kwargs.pop("value_serializer_kwargs", {}) or {}
headers = kwargs.get("headers")
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "django-kafka"
version = "0.5.14"
version = "0.5.15"
dependencies = [
"django>=4.2,<6.0",
"confluent-kafka[avro, schemaregistry]==2.7.0"
Expand Down

0 comments on commit 9d86b65

Please sign in to comment.