Skip to content

Commit

Permalink
Merge pull request #55 from lociii/ruff-pipeline closes #18
Browse files Browse the repository at this point in the history
Renovate, ruff fixes and ruff pipeline
  • Loading branch information
bodja authored Feb 7, 2025
2 parents 9d86b65 + 89a820e commit e684422
Show file tree
Hide file tree
Showing 31 changed files with 306 additions and 178 deletions.
23 changes: 23 additions & 0 deletions .github/workflows/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,29 @@ on:
- pull_request

jobs:
lint:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Set up Python 3.12
uses: actions/setup-python@v5
with:
python-version: 3.12

- name: Check code
uses: astral-sh/ruff-action@v3
with:
args: "check"
version-file: "example/requirements.txt"

- name: Check format
uses: astral-sh/ruff-action@v3
with:
args: "format --check"
version-file: "example/requirements.txt"

test:
runs-on: ubuntu-latest

Expand Down
3 changes: 3 additions & 0 deletions .ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,6 @@ max-public-methods = 50

[format]
skip-magic-trailing-comma = false

[lint.pyflakes]
extend-generics = ["django_kafka.registry.Registry"]
8 changes: 2 additions & 6 deletions django_kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from multiprocessing.pool import Pool
from typing import Optional, TYPE_CHECKING
from typing import Optional

from confluent_kafka.schema_registry import SchemaRegistryClient
from django.utils.functional import cached_property
Expand All @@ -12,17 +12,13 @@
from django_kafka.registry import ConnectorsRegistry, ConsumersRegistry
from django_kafka.retry.settings import RetrySettings

if TYPE_CHECKING:
from django_kafka.connect.connector import Connector
from django_kafka.consumer import Consumer

logger = logging.getLogger(__name__)

__version__ = "0.5.15"

__all__ = [
"autodiscover",
"DjangoKafka",
"autodiscover",
"kafka",
]

Expand Down
17 changes: 9 additions & 8 deletions django_kafka/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
# `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth
"CONNECT_AUTH": None,
# kwargs for `urllib3.util.retry.Retry` initialization
"CONNECT_RETRY": dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
),
# `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call
"CONNECT_RETRY": {
"connect": 5,
"read": 5,
"status": 5,
"backoff_factor": 0.5,
"status_forcelist": [502, 503, 504],
},
# `django_kafka.connect.client.KafkaConnectSession` would pass this
# value to every request method call
"CONNECT_REQUESTS_TIMEOUT": 30,
"CONNECTOR_NAME_PREFIX": "",
}
Expand Down
22 changes: 13 additions & 9 deletions django_kafka/connect/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import Optional

import requests
from requests.adapters import HTTPAdapter
from requests.auth import AuthBase
from urllib3.util import Retry
from requests.adapters import HTTPAdapter

from django_kafka.exceptions import DjangoKafkaError

Expand All @@ -13,8 +13,8 @@ def __init__(
self,
host: str,
auth: Optional[tuple | AuthBase] = None,
retry: dict = None,
timeout: int = None,
retry: Optional[dict] = None,
timeout: Optional[int] = None,
):
super().__init__()
self.auth = auth
Expand All @@ -41,8 +41,8 @@ def __init__(
self,
host: str,
auth: Optional[tuple | AuthBase] = None,
retry: dict = None,
timeout: int = None,
retry: Optional[dict] = None,
timeout: Optional[int] = None,
):
self._requests = KafkaConnectSession(host, auth, retry, timeout)

Expand All @@ -53,12 +53,16 @@ def delete(self, connector_name: str):
return self._requests.delete(f"/connectors/{connector_name}")

def validate(self, config: dict):
if not config.get('connector.class'):
raise DjangoKafkaError("'connector.class' config is required for validation.")
if not config.get("connector.class"):
raise DjangoKafkaError(
"'connector.class' config is required for validation.",
)

connector_class_name = config.get("connector.class").rsplit(".", 1)[-1]
response = self._requests.put(f"/connector-plugins/{connector_class_name}/config/validate", json=config)
return response
return self._requests.put(
f"/connector-plugins/{connector_class_name}/config/validate",
json=config,
)

def connector_status(self, connector_name: str):
"""
Expand Down
9 changes: 6 additions & 3 deletions django_kafka/connect/connector.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from abc import ABC, abstractmethod
from enum import StrEnum
from http import HTTPStatus

from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.connect.client import KafkaConnectClient
from django_kafka.exceptions import DjangoKafkaError

__all__ = [
"Connector",
Expand All @@ -17,8 +18,10 @@ class ConnectorStatus(StrEnum):
UNASSIGNED: The connector/task has not yet been assigned to a worker.
RUNNING: The connector/task is running.
PAUSED: The connector/task has been administratively paused.
FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output).
FAILED: The connector/task has failed (usually by raising an exception, which
is reported in the status output).
"""

UNASSIGNED = "UNASSIGNED"
RUNNING = "RUNNING"
PAUSED = "PAUSED"
Expand Down Expand Up @@ -54,7 +57,7 @@ def __init__(self):
def delete(self) -> bool:
response = self.client.delete(self.name)

if response.status_code == 404:
if response.status_code == HTTPStatus.NOT_FOUND:
return False

if not response.ok:
Expand Down
18 changes: 9 additions & 9 deletions django_kafka/connect/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ class Meta:
abstract = True
base_manager_name = "objects"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._reset_kafka_skip = False # set True for DB fetched instances in from_db

def __setattr__(self, key, value):
super().__setattr__(key, value)
if key == "kafka_skip":
self._reset_kafka_skip = False

def save(
self,
force_insert=False,
Expand Down Expand Up @@ -70,15 +79,6 @@ def from_db(cls, *args, **kwargs):
instance._reset_kafka_skip = True
return instance

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._reset_kafka_skip = False # set True for DB fetched instances in from_db

def __setattr__(self, key, value):
super().__setattr__(key, value)
if key == "kafka_skip":
self._reset_kafka_skip = False

def refresh_from_db(self, *args, **kwargs):
super().refresh_from_db(*args, **kwargs)
self._reset_kafka_skip = True
10 changes: 6 additions & 4 deletions django_kafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,16 @@ def get_topic(self, msg: "cimpl.Message") -> "TopicConsumer":
return self.topics.get(topic_name=msg.topic())

def log_error(
self,
msg: Optional["cimpl.Message"] = None,
exc_info: bool | Exception = False,
self,
msg: Optional["cimpl.Message"] = None,
exc_info: bool | Exception = False,
):
error = f"'{self.__class__.__module__}.{self.__class__.__name__} failed'"
if msg:
topic = self.get_topic(msg)
error = f"{error} on '{topic.__class__.__module__}.{topic.__class__.__name__}'"
error = (
f"{error} on '{topic.__class__.__module__}.{topic.__class__.__name__}'"
)

if msg_error := msg.error():
error = f"{error}\nMessage error: '{msg_error}'"
Expand Down
3 changes: 2 additions & 1 deletion django_kafka/consumer/managers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections.abc import Iterator
from datetime import datetime
from typing import TYPE_CHECKING, Iterator
from typing import TYPE_CHECKING

from confluent_kafka import TopicPartition
from django.utils import timezone
Expand Down
5 changes: 4 additions & 1 deletion django_kafka/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Optional


class DjangoKafkaError(Exception):
def __init__(self, *args, context: any = None, **kwargs):
def __init__(self, *args, context: Optional[any] = None, **kwargs):
super().__init__(*args, **kwargs)
self.context = context
9 changes: 7 additions & 2 deletions django_kafka/management/commands/errors.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from collections.abc import Iterable
from functools import wraps
from typing import Callable, Type
from typing import Callable


def substitute_error(errors: Iterable[Type[Exception]], substitution: Type[Exception]) -> Callable:
def substitute_error(
errors: Iterable[type[Exception]],
substitution: type[Exception],
) -> Callable:
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except tuple(errors) as original_error:
raise substitution(original_error) from original_error

return wrapper

return decorator
15 changes: 11 additions & 4 deletions django_kafka/management/commands/kafka_connect.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from http import HTTPStatus

from django.core.management import CommandError
from django.core.management.base import BaseCommand
Expand All @@ -23,7 +24,8 @@ def add_arguments(self, parser):
type=str,
default=None,
nargs="?",
help="Python path to the connector class(es). Processes all if not provided.",
help="Python path to the connector class(es). "
"Processes all if not provided.",
)
parser.add_argument(
"--list",
Expand All @@ -47,14 +49,16 @@ def add_arguments(self, parser):
"--check-status",
action="store_true",
default=False,
help="Check status of connectors. Currently RUNNING status is considered as success.",
help="Check status of connectors. Currently RUNNING "
"status is considered as success.",
)
parser.add_argument(
"--ignore-failures",
action="store_true",
default=False,
help="The command wont fail if failures were detected. "
"By default if any failures were detected the command exist with error status.",
"By default if any failures were detected the "
"command exist with error status.",
)

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -161,7 +165,10 @@ def _connector_is_running(self, connector: Connector):
try:
status = connector.status()
except DjangoKafkaError as error:
if isinstance(error.context, Response) and error.context.status_code == 404:
if (
isinstance(error.context, Response)
and error.context.status_code == HTTPStatus.NOT_FOUND
):
# retry: on 404 as some delays are expected
raise

Expand Down
5 changes: 4 additions & 1 deletion django_kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __getattr__(self, name):
class Suppression(ContextDecorator):
"""context manager to help suppress producing messages to desired Kafka topics"""

_var = ContextVar(f"{__name__}.suppression", default=[])
_var = ContextVar(f"{__name__}.suppression", default=None)

@classmethod
def active(cls, topic: str):
Expand All @@ -74,6 +74,9 @@ def active(cls, topic: str):

def __init__(self, topics: Optional[list[str]], deactivate=False):
current = self._var.get()
if current is None:
self._var.set([])

if deactivate:
self.topics = []
elif topics is None or current is None:
Expand Down
19 changes: 9 additions & 10 deletions django_kafka/registry.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from typing import Generic, TYPE_CHECKING, Type, TypeVar
from typing import TYPE_CHECKING, Generic, TypeVar

from django_kafka.exceptions import DjangoKafkaError

if TYPE_CHECKING:
from django_kafka.consumer import Consumer
from django_kafka.connect.connector import Connector
from django_kafka.consumer import Consumer


T = TypeVar('T')
T = TypeVar("T")


class Registry(Generic[T]):
def __init__(self):
self._classes: dict[str, Type[T]] = {}
self._classes: dict[str, type[T]] = {}

def __call__(self):
def add_to_registry(cls: Type[T]) -> Type[T]:
def add_to_registry(cls: type[T]) -> type[T]:
self.register(cls)
return cls

Expand All @@ -30,10 +30,10 @@ def __getitem__(self, key: str):
def __iter__(self):
yield from self._classes.keys()

def get_key(self, cls: Type[T]) -> str:
def get_key(self, cls: type[T]) -> str:
return f"{cls.__module__}.{cls.__name__}"

def register(self, cls: Type[T]):
def register(self, cls: type[T]):
key = self.get_key(cls)
if key in self._classes:
raise DjangoKafkaError(f"`{key}` is already registered.")
Expand All @@ -46,11 +46,10 @@ def get_key(self, cls) -> str:


class ConsumersRegistry(Registry["Consumer"]):

def register(self, cls):
from django_kafka.retry.consumer import RetryConsumer

super().register(cls)

if retry_consumer_cls := RetryConsumer.build(cls):
self._classes[f"{self.get_key(cls)}.retry"] = retry_consumer_cls
Loading

0 comments on commit e684422

Please sign in to comment.