Skip to content

Commit

Permalink
fix: change the way Suppression handles it's context var. Cover edg…
Browse files Browse the repository at this point in the history
…e case with test.

fixes #56
  • Loading branch information
bodja committed Feb 25, 2025
1 parent 2e284c6 commit 6d3db52
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 12 deletions.
33 changes: 22 additions & 11 deletions django_kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,38 @@ def __getattr__(self, name):
class Suppression(ContextDecorator):
"""context manager to help suppress producing messages to desired Kafka topics"""

_var = ContextVar(f"{__name__}.suppression", default=None)
_var = ContextVar(f"{__name__}.suppression")

@classmethod
def active(cls, topic: str):
"""returns if suppression is enabled for the given topic"""
topics = cls._var.get()
try:
topics = cls._var.get()
except LookupError:
# context var is not yet set, defaulting to empty list
topics = []

# topics will be None when suppress() is initialized without topics provided
if topics is None:
return True # all topics

return topic in topics

def __init__(self, topics: Optional[list[str]], deactivate=False):
current = self._var.get()
if current is None:
self._var.set([])
def __init__(self, topics: Optional[list[str]] = None, deactivate=False):
try:
topics_in_context = self._var.get()
except LookupError:
topics_in_context = []

if deactivate:
self.topics = []
elif topics is None or current is None:

elif topics is None or topics_in_context is None:
self.topics = None # indicates all topics

elif isinstance(topics, list):
self.topics = current + topics
self.topics = topics_in_context + topics

else:
raise ValueError(f"invalid producer suppression setting {topics}")

Expand All @@ -96,11 +107,11 @@ def __exit__(self, *args, **kwargs):

def suppress(topics: Optional[Callable | list[str]] = None):
if callable(topics):
return Suppression(None)(topics)
return Suppression()(topics)
return Suppression(topics)


def unsuppress(fn: Optional[Callable] = None):
if fn:
return Suppression(None, deactivate=True)(fn)
return Suppression(None, deactivate=True)
return Suppression(deactivate=True)(fn)
return Suppression(deactivate=True)
6 changes: 5 additions & 1 deletion django_kafka/tests/test_producers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@

from django.test import TestCase

from django_kafka.producer import Producer, suppress, unsuppress
from django_kafka.producer import Producer, Suppression, suppress, unsuppress


@mock.patch("django_kafka.producer.ConfluentProducer")
class ProducerSuppressTestCase(TestCase):
def test_suppression_active(self, mock_confluent_producer):
# suppress should not be active if it hasn't been called
self.assertFalse(Suppression.active('topicA'))

def test_suppress_all(self, mock_confluent_producer):
producer = Producer()

Expand Down

0 comments on commit 6d3db52

Please sign in to comment.