From 50d5ee92902e55005c83d21845c84c13b4929d31 Mon Sep 17 00:00:00 2001
From: Philipp Kessling
Date: Tue, 21 Jan 2025 17:35:09 +0100
Subject: [PATCH 1/6] Use generators in record splitting instead of lists.
---
dabapush/Reader/NDJSONReader.py | 14 +++++++-------
dabapush/Record.py | 34 ++++++++++++++++-----------------
2 files changed, 23 insertions(+), 25 deletions(-)
diff --git a/dabapush/Reader/NDJSONReader.py b/dabapush/Reader/NDJSONReader.py
index 32d42ae..29ff064 100644
--- a/dabapush/Reader/NDJSONReader.py
+++ b/dabapush/Reader/NDJSONReader.py
@@ -1,7 +1,7 @@
"""NDJSON Writer plug-in for dabapush"""
# pylint: disable=R,I1101
-from typing import Iterator, List
+from typing import Iterator
import ujson
@@ -14,10 +14,10 @@
def read_and_split(
record: Record,
flatten_records: bool = False,
-) -> List[Record]:
+) -> Iterator[Record]:
"""Reads a file and splits it into records by line."""
with record.payload.open("rt", encoding="utf8") as file:
- children = [
+ children = (
Record(
uuid=f"{str(record.uuid)}:{str(line_number)}",
payload=(
@@ -28,10 +28,10 @@ def read_and_split(
source=record,
)
for line_number, line in enumerate(file)
- ]
- record.children.extend(children)
-
- return children
+ )
+ for child in children:
+ record.children.append(child)
+ yield child
class NDJSONReader(FileReader):
diff --git a/dabapush/Record.py b/dabapush/Record.py
index d668205..382118a 100644
--- a/dabapush/Record.py
+++ b/dabapush/Record.py
@@ -5,7 +5,7 @@
# pylint: disable=R0917, R0913
from datetime import datetime
-from typing import Any, Callable, Dict, List, Literal, Optional, Self, Union
+from typing import Any, Callable, Dict, Iterable, List, Literal, Optional, Self, Union
from uuid import uuid4
from loguru import logger as log
@@ -92,9 +92,9 @@ def split(
self,
key: Optional[str] = None,
id_key: Optional[str] = None,
- func: Optional[Callable[[Self, ...], List[Self]]] = None,
+ func: Optional[Callable[[Self, ...], Iterable[Self]]] = None,
**kwargs,
- ) -> List[Self]:
+ ) -> Iterable[Self]:
"""Splits the record bases on either a keyword or a function. If a function is provided,
it will be used to split the payload, even if you provide a key. If a key is provided, it
will split the payload.
@@ -134,22 +134,20 @@ def split(
def _handle_key_split_(self, id_key, key):
payload = self.payload # Get the payload, the original payload
# will be set to None to free memory.
- if key not in payload:
- return []
- if not isinstance(payload[key], list):
- return []
- split_payload = [
- Record(
- **{
- "payload": value,
- "uuid": value.get(id_key) if id_key else uuid4().hex,
- "source": self,
- }
+ if key in payload and isinstance(payload[key], list):
+ split_payload = (
+ Record(
+ **{
+ "payload": value,
+ "uuid": value.get(id_key) if id_key else uuid4().hex,
+ "source": self,
+ }
+ )
+ for value in payload[key]
)
- for value in payload[key]
- ]
- self.children.extend(split_payload)
- return split_payload
+ for child in split_payload:
+ self.children.append(child)
+ yield child
def to_log(self) -> Dict[str, Union[str, List[Dict[str, Any]]]]:
"""Return a loggable representation of the record."""
From 1388e938e818bcb2d9e1ff568c4e0d0d299e9500 Mon Sep 17 00:00:00 2001
From: Philipp Kessling
Date: Wed, 22 Jan 2025 09:43:52 +0100
Subject: [PATCH 2/6] fix error in test_records.py
---
tests/test_Record.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/tests/test_Record.py b/tests/test_Record.py
index ec69db9..9906274 100644
--- a/tests/test_Record.py
+++ b/tests/test_Record.py
@@ -36,7 +36,7 @@ def test_splitting_record():
record = Record(payload, Path())
records = record.split("key")
- assert len(records) == 3
+ assert len(list(records)) == 3
for _record_ in records:
assert _record_.uuid
assert _record_.processed_at
@@ -57,7 +57,7 @@ def test_splitting_record_with_children_ids():
record = Record(payload, Path())
records = record.split("key", id_key="id")
- assert len(records) == 3
+ assert len(list(records)) == 3
for n, _record_ in enumerate(records, 1):
assert _record_.uuid == n
assert _record_.processed_at
@@ -78,14 +78,14 @@ def test_splitting_record_without_key():
record = Record(payload, Path())
records = record.split("key2")
- assert len(records) == 0
+ assert len(list(records)) == 0
def test_splitting_record_without_payload():
"""Should not split a Record without a payload."""
record = Record({}, Path())
records = record.split("key")
- assert len(records) == 0
+ assert len(list(records)) == 0
def test_logging_record():
From 6df7525c7b57aa4efd58585d68281f7b0cae32a3 Mon Sep 17 00:00:00 2001
From: Philipp Kessling
Date: Wed, 22 Jan 2025 11:56:26 +0100
Subject: [PATCH 3/6] Refactor log-keeping
---
dabapush/Reader/JSONReader.py | 8 ++-----
dabapush/Reader/NDJSONReader.py | 8 ++-----
dabapush/Reader/Reader.py | 27 +--------------------
dabapush/Record.py | 8 +++----
dabapush/Writer/Writer.py | 40 ++++++++++++++++++++++++++++++-
tests/Reader/test_JSONReader.py | 31 +++---------------------
tests/Reader/test_NDJSONReader.py | 32 -------------------------
tests/Writer/test_Writer.py | 33 +++++++++++++++++++++----
8 files changed, 79 insertions(+), 108 deletions(-)
diff --git a/dabapush/Reader/JSONReader.py b/dabapush/Reader/JSONReader.py
index 0dabe72..9282100 100644
--- a/dabapush/Reader/JSONReader.py
+++ b/dabapush/Reader/JSONReader.py
@@ -11,7 +11,6 @@
from .Reader import FileReader
-
class JSONReader(FileReader):
"""Reader to read ready to read directories containing multiple json files.
It matches files in the path-tree against the pattern and reads the
@@ -36,14 +35,11 @@ def read(self) -> Iterator[Record]:
record = Record(
uuid=f"{str(file_record.uuid)}",
payload=(
- parsed
- if not self.config.flatten_dicts
- else flatten(parsed)
+ parsed if not self.config.flatten_dicts else flatten(parsed)
),
source=file_record,
)
- if record not in self.back_log:
- yield record
+ yield record
class JSONReaderConfiguration(ReaderConfiguration):
diff --git a/dabapush/Reader/NDJSONReader.py b/dabapush/Reader/NDJSONReader.py
index 29ff064..a8ac217 100644
--- a/dabapush/Reader/NDJSONReader.py
+++ b/dabapush/Reader/NDJSONReader.py
@@ -53,13 +53,9 @@ def read(self) -> Iterator[Record]:
"""reads multiple NDJSON files and emits them line by line"""
for file_record in self.records:
- filtered_records = filter(
- lambda x: x not in self.back_log,
- file_record.split(
- func=read_and_split, flatten_records=self.config.flatten_dicts
- ),
+ yield from file_record.split(
+ func=read_and_split, flatten_records=self.config.flatten_dicts
)
- yield from filtered_records
class NDJSONReaderConfiguration(ReaderConfiguration):
diff --git a/dabapush/Reader/Reader.py b/dabapush/Reader/Reader.py
index 09a81fd..94df09a 100644
--- a/dabapush/Reader/Reader.py
+++ b/dabapush/Reader/Reader.py
@@ -4,8 +4,6 @@
from pathlib import Path
from typing import Iterator
-import ujson
-from loguru import logger as log
from tqdm.auto import tqdm
from ..Configuration.ReaderConfiguration import ReaderConfiguration
@@ -33,12 +31,6 @@ def __init__(self, config: ReaderConfiguration):
be a subclass of ReaderConfiguration.
"""
self.config = config
- self.back_log = []
- # initialize file log
- if not Path(".dabapush/").exists():
- Path(".dabapush/").mkdir()
-
- self.log_path = Path(f".dabapush/{config.name}.jsonl")
@abc.abstractmethod
def read(self) -> Iterator[Record]:
@@ -77,28 +69,11 @@ def read(self) -> Iterator[Record]:
@property
def records(self) -> Iterator[Record]:
"""Generator for all files matching the pattern in the read_path."""
- if self.log_path.exists():
- log.debug(
- f"Found log file for {self.config.name} at {self.log_path}. Loading..."
- )
- with self.log_path.open("rt", encoding="utf8") as f:
- self.back_log = [Record(**ujson.loads(_)) for _ in f.readlines()]
- else:
- self.log_path.touch()
-
yield from (
Record(
uuid=str(a),
payload=a,
- event_handlers={"on_done": [self.log]},
+ # event_handlers={"on_done": [self.log]},
)
for a in tqdm(list(Path(self.config.read_path).rglob(self.config.pattern)))
)
-
- def log(self, record: Record):
- """Log the record to the persistent record log file."""
- with self.log_path.open("a", encoding="utf8") as f:
- for sub_record in record.walk_tree(only_leafs=True):
- ujson.dump(sub_record.to_log(), f)
- f.write("\n")
- log.debug(f"Done with {record.uuid}")
diff --git a/dabapush/Record.py b/dabapush/Record.py
index 382118a..d4a3273 100644
--- a/dabapush/Record.py
+++ b/dabapush/Record.py
@@ -185,10 +185,10 @@ def done(self):
# Signal parent that this record is done
self._state_ = "done"
log.debug(f"Record {self.uuid} is set as done.")
- if self.source:
- self.source.signal_done()
- log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.")
- self.__dispatch_event__("on_done")
+ # if self.source:
+ # self.source.signal_done()
+ # log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.")
+ # self.__dispatch_event__("on_done")
def signal_done(self):
"""Signal that a child record is done."""
diff --git a/dabapush/Writer/Writer.py b/dabapush/Writer/Writer.py
index 7efd0d5..7baf827 100644
--- a/dabapush/Writer/Writer.py
+++ b/dabapush/Writer/Writer.py
@@ -6,8 +6,10 @@
"""
import abc
+from pathlib import Path
from typing import Iterator, List
+import ujson
from loguru import logger as log
from ..Configuration.WriterConfiguration import WriterConfiguration
@@ -27,10 +29,31 @@ def __init__(self, config: WriterConfiguration):
self.config = config
self.buffer: List[Record] = []
+ self.back_log: List[Record] = []
+ # initialize file log
+ if not Path(".dabapush/").exists():
+ Path(".dabapush/").mkdir()
+
+ self.log_path = Path(f".dabapush/{config.name}.jsonl")
+ if self.log_path.exists():
+ log.debug(
+ f"Found log file for {self.config.name} at {self.log_path}. Loading..."
+ )
+ with self.log_path.open("rt", encoding="utf8") as f:
+ self.back_log = [
+ Record(**ujson.loads(_)) # pylint: disable=I1101
+ for _ in f.readlines()
+ ]
+ else:
+ self.log_path.touch()
+ self.log_file = self.log_path.open( # pylint: disable=R1732
+ "a", encoding="utf8"
+ )
def __del__(self):
"""Ensures the buffer is flushed before the object is destroyed."""
self._trigger_persist()
+ self.log_file.close()
def write(self, queue: Iterator[Record]) -> None:
"""Consumes items from the provided queue.
@@ -39,16 +62,20 @@ def write(self, queue: Iterator[Record]) -> None:
queue (Iterator[Record]): Items to be consumed.
"""
for item in queue:
+ if item in self.back_log:
+ continue
self.buffer.append(item)
if len(self.buffer) >= self.config.chunk_size:
self._trigger_persist()
def _trigger_persist(self):
self.persist()
- log.debug(f"Persisted {self.config.chunk_size} records. Setting to done.")
+ log.debug(f"Persisted {len(self.buffer)} records. Setting to done.")
for record in self.buffer:
log.debug(f"Setting record {record.uuid} as done.")
record.done()
+ self.log(record)
+ self.log_file.flush()
self.buffer = []
@abc.abstractmethod
@@ -72,3 +99,14 @@ def id(self):
str: The ID of the writer.
"""
return self.config.id
+
+ def log(self, record: Record):
+ """Log the record to the persistent record log file."""
+ ujson.dump(record.to_log(), self.log_file) # pylint: disable=I1101
+ self.log_file.write("\n")
+
+ # with self.log_path.open("a", encoding="utf8") as f:
+ # for sub_record in record.walk_tree(only_leafs=True):
+ # ujson.dump(sub_record.to_log(), f)
+ # f.write("\n")
+ log.debug(f"Done with {record.uuid}")
diff --git a/tests/Reader/test_JSONReader.py b/tests/Reader/test_JSONReader.py
index e4e74b8..797ccf3 100644
--- a/tests/Reader/test_JSONReader.py
+++ b/tests/Reader/test_JSONReader.py
@@ -7,16 +7,18 @@
from dabapush.Reader.JSONReader import JSONReader, JSONReaderConfiguration
+
@pytest.fixture
def input_json_directory(isolated_test_dir):
"Pytest fixture creating a directory with 20 json files."
- for idx in range(10,30):
+ for idx in range(10, 30):
file_path = isolated_test_dir / f"test_{idx}.json"
with file_path.open("wt") as out_file:
json.dump({"test_key": idx}, out_file)
out_file.write("\n")
return isolated_test_dir
+
def test_read(input_json_directory: Path): # pylint: disable=W0621
"""Should read the data from the file."""
reader = JSONReader(
@@ -28,30 +30,3 @@ def test_read(input_json_directory: Path): # pylint: disable=W0621
print(record)
assert record.processed_at
assert record.payload == {"test_key": int(record.uuid[-7:-5])}
-
-
-def test_read_with_backlog(input_json_directory: Path): # pylint: disable=W0621
- """Should only read the new data."""
- reader = JSONReaderConfiguration(
- "test", read_path=str(input_json_directory.resolve()), pattern="*.json"
- ).get_instance()
-
- def wrapper():
- n = None
- for n, record in enumerate(reader.read()):
- record.done()
- return n or 0
-
- n = wrapper()
-
- assert n + 1 == 20
-
- reader2 = JSONReaderConfiguration(
- "test", read_path=str(input_json_directory.resolve())
- ).get_instance()
-
- records2 = list(reader2.read())
- log_path = input_json_directory / ".dabapush/test.jsonl"
- assert log_path.exists()
- assert len(reader2.back_log) == 20
- assert len(records2) == 0
diff --git a/tests/Reader/test_NDJSONReader.py b/tests/Reader/test_NDJSONReader.py
index 124af67..d567b40 100644
--- a/tests/Reader/test_NDJSONReader.py
+++ b/tests/Reader/test_NDJSONReader.py
@@ -30,35 +30,3 @@ def test_read(isolated_test_dir: Path, data): # pylint: disable=W0621
for n, record in enumerate(records):
assert record.processed_at
assert record.payload == data[n]
-
-
-def test_read_with_backlog(isolated_test_dir: Path, data): # pylint: disable=W0621
- """Should only read the new data."""
- reader = NDJSONReaderConfiguration(
- "test", read_path=str(isolated_test_dir.resolve()), pattern="*.ndjson"
- ).get_instance()
- file_path = isolated_test_dir / "test.ndjson"
- with file_path.open("wt") as file:
- for line in data:
- json.dump(line, file)
- file.write("\n")
-
- def wrapper():
- n = None
- for n, record in enumerate(reader.read()):
- record.done()
- return n or 0
-
- n = wrapper()
-
- assert n + 1 == 20
-
- reader2 = NDJSONReaderConfiguration(
- "test", read_path=str(isolated_test_dir.resolve())
- ).get_instance()
-
- records2 = list(reader2.read())
- log_path = isolated_test_dir / ".dabapush/test.jsonl"
- assert log_path.exists()
- assert len(reader2.back_log) == 20
- assert len(records2) == 0
diff --git a/tests/Writer/test_Writer.py b/tests/Writer/test_Writer.py
index 7852289..abb7298 100644
--- a/tests/Writer/test_Writer.py
+++ b/tests/Writer/test_Writer.py
@@ -1,6 +1,6 @@
"""Tests for the Writer class."""
-# pylint: disable=W0621, C0114, C0115, C0116
+# pylint: disable=W0212, W0621, C0114, C0115, C0116
from pytest import fixture
from dabapush.Configuration.WriterConfiguration import WriterConfiguration
@@ -9,7 +9,8 @@
@fixture
-def writer() -> Writer:
+def writer(monkeypatch, tmp_path) -> Writer:
+ monkeypatch.chdir(tmp_path)
config = WriterConfiguration(name="test")
return Writer(config)
@@ -36,15 +37,37 @@ def __init__(self, config):
def persist(self):
self.persisted_data.extend((_.payload for _ in self.buffer))
- self.buffer = []
-def test_writer_persist_method():
+def test_writer_persist_method(monkeypatch, tmp_path):
"""Should persist the buffer."""
+ monkeypatch.chdir(tmp_path)
+
config = WriterConfiguration(name="test", id=1, chunk_size=3)
writer = MyTestWriter(config)
queue = (Record(uuid=str(i), payload=i) for i in range(10))
writer.write(queue)
- writer.persist()
+ writer._trigger_persist()
assert writer.persisted_data == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert not writer.buffer
+
+
+def test_writer_persist_method_with_backlog(monkeypatch, tmp_path):
+ """Should persist the buffer."""
+ monkeypatch.chdir(tmp_path)
+
+ config = WriterConfiguration(name="test", id=1, chunk_size=3)
+ queue = [Record(uuid=str(i), payload=i) for i in range(10)]
+ writer = MyTestWriter(config)
+ writer.write((_ for _ in queue))
+ writer._trigger_persist()
+
+ del writer
+
+ writer = MyTestWriter(config)
+ writer._trigger_persist()
+
+ assert not writer.persisted_data
+ assert not writer.buffer
+ assert len(writer.back_log) == 10
+ assert all(_.uuid == str(i) for i, _ in enumerate(writer.back_log))
From 38d4b046bbf1adba6426d1948cf0bafe099f92d4 Mon Sep 17 00:00:00 2001
From: Philipp Kessling
Date: Wed, 22 Jan 2025 12:03:44 +0100
Subject: [PATCH 4/6] Use fixture to isolate test runs.
---
tests/Writer/test_Writer.py | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git a/tests/Writer/test_Writer.py b/tests/Writer/test_Writer.py
index abb7298..6a6c188 100644
--- a/tests/Writer/test_Writer.py
+++ b/tests/Writer/test_Writer.py
@@ -1,6 +1,6 @@
"""Tests for the Writer class."""
-# pylint: disable=W0212, W0621, C0114, C0115, C0116
+# pylint: disable=W0212, W0621, W0613, C0114, C0115, C0116
from pytest import fixture
from dabapush.Configuration.WriterConfiguration import WriterConfiguration
@@ -9,8 +9,8 @@
@fixture
-def writer(monkeypatch, tmp_path) -> Writer:
- monkeypatch.chdir(tmp_path)
+def writer(isolated_test_dir) -> Writer:
+
config = WriterConfiguration(name="test")
return Writer(config)
@@ -39,9 +39,8 @@ def persist(self):
self.persisted_data.extend((_.payload for _ in self.buffer))
-def test_writer_persist_method(monkeypatch, tmp_path):
+def test_writer_persist_method(isolated_test_dir):
"""Should persist the buffer."""
- monkeypatch.chdir(tmp_path)
config = WriterConfiguration(name="test", id=1, chunk_size=3)
writer = MyTestWriter(config)
@@ -52,9 +51,8 @@ def test_writer_persist_method(monkeypatch, tmp_path):
assert not writer.buffer
-def test_writer_persist_method_with_backlog(monkeypatch, tmp_path):
+def test_writer_persist_method_with_backlog(isolated_test_dir):
"""Should persist the buffer."""
- monkeypatch.chdir(tmp_path)
config = WriterConfiguration(name="test", id=1, chunk_size=3)
queue = [Record(uuid=str(i), payload=i) for i in range(10)]
From 66176e3c8c41eb50693a79e244ddaa4d5a0da331 Mon Sep 17 00:00:00 2001
From: Philipp Kessling
Date: Wed, 22 Jan 2025 12:08:44 +0100
Subject: [PATCH 5/6] Remove commented code.
---
dabapush/Reader/Reader.py | 1 -
dabapush/Writer/Writer.py | 4 ----
2 files changed, 5 deletions(-)
diff --git a/dabapush/Reader/Reader.py b/dabapush/Reader/Reader.py
index 94df09a..4fd2efe 100644
--- a/dabapush/Reader/Reader.py
+++ b/dabapush/Reader/Reader.py
@@ -73,7 +73,6 @@ def records(self) -> Iterator[Record]:
Record(
uuid=str(a),
payload=a,
- # event_handlers={"on_done": [self.log]},
)
for a in tqdm(list(Path(self.config.read_path).rglob(self.config.pattern)))
)
diff --git a/dabapush/Writer/Writer.py b/dabapush/Writer/Writer.py
index 7baf827..22e4e74 100644
--- a/dabapush/Writer/Writer.py
+++ b/dabapush/Writer/Writer.py
@@ -105,8 +105,4 @@ def log(self, record: Record):
ujson.dump(record.to_log(), self.log_file) # pylint: disable=I1101
self.log_file.write("\n")
- # with self.log_path.open("a", encoding="utf8") as f:
- # for sub_record in record.walk_tree(only_leafs=True):
- # ujson.dump(sub_record.to_log(), f)
- # f.write("\n")
log.debug(f"Done with {record.uuid}")
From db303b13e5fe559616f8e5f72402c13a22be1c72 Mon Sep 17 00:00:00 2001
From: Philipp Kessling
Date: Wed, 22 Jan 2025 12:25:46 +0100
Subject: [PATCH 6/6] Do not signal parent records.
---
dabapush/Record.py | 13 -------------
1 file changed, 13 deletions(-)
diff --git a/dabapush/Record.py b/dabapush/Record.py
index d4a3273..5c789ab 100644
--- a/dabapush/Record.py
+++ b/dabapush/Record.py
@@ -185,19 +185,6 @@ def done(self):
# Signal parent that this record is done
self._state_ = "done"
log.debug(f"Record {self.uuid} is set as done.")
- # if self.source:
- # self.source.signal_done()
- # log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.")
- # self.__dispatch_event__("on_done")
-
- def signal_done(self):
- """Signal that a child record is done."""
- # If all children are done, so is the parent.
- _children_status_ = [child.state == "done" for child in self.children]
- log.debug(f"Signaled that children of {self.uuid} is done.")
- if all(_children_status_):
- self.done()
- log.debug(f"Record {self.uuid} is done.")
def destroy(self):
"""Destroy the record and all its children."""