From 50d5ee92902e55005c83d21845c84c13b4929d31 Mon Sep 17 00:00:00 2001 From: Philipp Kessling Date: Tue, 21 Jan 2025 17:35:09 +0100 Subject: [PATCH 1/6] Use generators in record splitting instead of lists. --- dabapush/Reader/NDJSONReader.py | 14 +++++++------- dabapush/Record.py | 34 ++++++++++++++++----------------- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/dabapush/Reader/NDJSONReader.py b/dabapush/Reader/NDJSONReader.py index 32d42ae..29ff064 100644 --- a/dabapush/Reader/NDJSONReader.py +++ b/dabapush/Reader/NDJSONReader.py @@ -1,7 +1,7 @@ """NDJSON Writer plug-in for dabapush""" # pylint: disable=R,I1101 -from typing import Iterator, List +from typing import Iterator import ujson @@ -14,10 +14,10 @@ def read_and_split( record: Record, flatten_records: bool = False, -) -> List[Record]: +) -> Iterator[Record]: """Reads a file and splits it into records by line.""" with record.payload.open("rt", encoding="utf8") as file: - children = [ + children = ( Record( uuid=f"{str(record.uuid)}:{str(line_number)}", payload=( @@ -28,10 +28,10 @@ def read_and_split( source=record, ) for line_number, line in enumerate(file) - ] - record.children.extend(children) - - return children + ) + for child in children: + record.children.append(child) + yield child class NDJSONReader(FileReader): diff --git a/dabapush/Record.py b/dabapush/Record.py index d668205..382118a 100644 --- a/dabapush/Record.py +++ b/dabapush/Record.py @@ -5,7 +5,7 @@ # pylint: disable=R0917, R0913 from datetime import datetime -from typing import Any, Callable, Dict, List, Literal, Optional, Self, Union +from typing import Any, Callable, Dict, Iterable, List, Literal, Optional, Self, Union from uuid import uuid4 from loguru import logger as log @@ -92,9 +92,9 @@ def split( self, key: Optional[str] = None, id_key: Optional[str] = None, - func: Optional[Callable[[Self, ...], List[Self]]] = None, + func: Optional[Callable[[Self, ...], Iterable[Self]]] = None, **kwargs, - ) -> List[Self]: + ) -> Iterable[Self]: """Splits the record bases on either a keyword or a function. If a function is provided, it will be used to split the payload, even if you provide a key. If a key is provided, it will split the payload. @@ -134,22 +134,20 @@ def split( def _handle_key_split_(self, id_key, key): payload = self.payload # Get the payload, the original payload # will be set to None to free memory. - if key not in payload: - return [] - if not isinstance(payload[key], list): - return [] - split_payload = [ - Record( - **{ - "payload": value, - "uuid": value.get(id_key) if id_key else uuid4().hex, - "source": self, - } + if key in payload and isinstance(payload[key], list): + split_payload = ( + Record( + **{ + "payload": value, + "uuid": value.get(id_key) if id_key else uuid4().hex, + "source": self, + } + ) + for value in payload[key] ) - for value in payload[key] - ] - self.children.extend(split_payload) - return split_payload + for child in split_payload: + self.children.append(child) + yield child def to_log(self) -> Dict[str, Union[str, List[Dict[str, Any]]]]: """Return a loggable representation of the record.""" From 1388e938e818bcb2d9e1ff568c4e0d0d299e9500 Mon Sep 17 00:00:00 2001 From: Philipp Kessling Date: Wed, 22 Jan 2025 09:43:52 +0100 Subject: [PATCH 2/6] fix error in test_records.py --- tests/test_Record.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_Record.py b/tests/test_Record.py index ec69db9..9906274 100644 --- a/tests/test_Record.py +++ b/tests/test_Record.py @@ -36,7 +36,7 @@ def test_splitting_record(): record = Record(payload, Path()) records = record.split("key") - assert len(records) == 3 + assert len(list(records)) == 3 for _record_ in records: assert _record_.uuid assert _record_.processed_at @@ -57,7 +57,7 @@ def test_splitting_record_with_children_ids(): record = Record(payload, Path()) records = record.split("key", id_key="id") - assert len(records) == 3 + assert len(list(records)) == 3 for n, _record_ in enumerate(records, 1): assert _record_.uuid == n assert _record_.processed_at @@ -78,14 +78,14 @@ def test_splitting_record_without_key(): record = Record(payload, Path()) records = record.split("key2") - assert len(records) == 0 + assert len(list(records)) == 0 def test_splitting_record_without_payload(): """Should not split a Record without a payload.""" record = Record({}, Path()) records = record.split("key") - assert len(records) == 0 + assert len(list(records)) == 0 def test_logging_record(): From 6df7525c7b57aa4efd58585d68281f7b0cae32a3 Mon Sep 17 00:00:00 2001 From: Philipp Kessling Date: Wed, 22 Jan 2025 11:56:26 +0100 Subject: [PATCH 3/6] Refactor log-keeping --- dabapush/Reader/JSONReader.py | 8 ++----- dabapush/Reader/NDJSONReader.py | 8 ++----- dabapush/Reader/Reader.py | 27 +-------------------- dabapush/Record.py | 8 +++---- dabapush/Writer/Writer.py | 40 ++++++++++++++++++++++++++++++- tests/Reader/test_JSONReader.py | 31 +++--------------------- tests/Reader/test_NDJSONReader.py | 32 ------------------------- tests/Writer/test_Writer.py | 33 +++++++++++++++++++++---- 8 files changed, 79 insertions(+), 108 deletions(-) diff --git a/dabapush/Reader/JSONReader.py b/dabapush/Reader/JSONReader.py index 0dabe72..9282100 100644 --- a/dabapush/Reader/JSONReader.py +++ b/dabapush/Reader/JSONReader.py @@ -11,7 +11,6 @@ from .Reader import FileReader - class JSONReader(FileReader): """Reader to read ready to read directories containing multiple json files. It matches files in the path-tree against the pattern and reads the @@ -36,14 +35,11 @@ def read(self) -> Iterator[Record]: record = Record( uuid=f"{str(file_record.uuid)}", payload=( - parsed - if not self.config.flatten_dicts - else flatten(parsed) + parsed if not self.config.flatten_dicts else flatten(parsed) ), source=file_record, ) - if record not in self.back_log: - yield record + yield record class JSONReaderConfiguration(ReaderConfiguration): diff --git a/dabapush/Reader/NDJSONReader.py b/dabapush/Reader/NDJSONReader.py index 29ff064..a8ac217 100644 --- a/dabapush/Reader/NDJSONReader.py +++ b/dabapush/Reader/NDJSONReader.py @@ -53,13 +53,9 @@ def read(self) -> Iterator[Record]: """reads multiple NDJSON files and emits them line by line""" for file_record in self.records: - filtered_records = filter( - lambda x: x not in self.back_log, - file_record.split( - func=read_and_split, flatten_records=self.config.flatten_dicts - ), + yield from file_record.split( + func=read_and_split, flatten_records=self.config.flatten_dicts ) - yield from filtered_records class NDJSONReaderConfiguration(ReaderConfiguration): diff --git a/dabapush/Reader/Reader.py b/dabapush/Reader/Reader.py index 09a81fd..94df09a 100644 --- a/dabapush/Reader/Reader.py +++ b/dabapush/Reader/Reader.py @@ -4,8 +4,6 @@ from pathlib import Path from typing import Iterator -import ujson -from loguru import logger as log from tqdm.auto import tqdm from ..Configuration.ReaderConfiguration import ReaderConfiguration @@ -33,12 +31,6 @@ def __init__(self, config: ReaderConfiguration): be a subclass of ReaderConfiguration. """ self.config = config - self.back_log = [] - # initialize file log - if not Path(".dabapush/").exists(): - Path(".dabapush/").mkdir() - - self.log_path = Path(f".dabapush/{config.name}.jsonl") @abc.abstractmethod def read(self) -> Iterator[Record]: @@ -77,28 +69,11 @@ def read(self) -> Iterator[Record]: @property def records(self) -> Iterator[Record]: """Generator for all files matching the pattern in the read_path.""" - if self.log_path.exists(): - log.debug( - f"Found log file for {self.config.name} at {self.log_path}. Loading..." - ) - with self.log_path.open("rt", encoding="utf8") as f: - self.back_log = [Record(**ujson.loads(_)) for _ in f.readlines()] - else: - self.log_path.touch() - yield from ( Record( uuid=str(a), payload=a, - event_handlers={"on_done": [self.log]}, + # event_handlers={"on_done": [self.log]}, ) for a in tqdm(list(Path(self.config.read_path).rglob(self.config.pattern))) ) - - def log(self, record: Record): - """Log the record to the persistent record log file.""" - with self.log_path.open("a", encoding="utf8") as f: - for sub_record in record.walk_tree(only_leafs=True): - ujson.dump(sub_record.to_log(), f) - f.write("\n") - log.debug(f"Done with {record.uuid}") diff --git a/dabapush/Record.py b/dabapush/Record.py index 382118a..d4a3273 100644 --- a/dabapush/Record.py +++ b/dabapush/Record.py @@ -185,10 +185,10 @@ def done(self): # Signal parent that this record is done self._state_ = "done" log.debug(f"Record {self.uuid} is set as done.") - if self.source: - self.source.signal_done() - log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.") - self.__dispatch_event__("on_done") + # if self.source: + # self.source.signal_done() + # log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.") + # self.__dispatch_event__("on_done") def signal_done(self): """Signal that a child record is done.""" diff --git a/dabapush/Writer/Writer.py b/dabapush/Writer/Writer.py index 7efd0d5..7baf827 100644 --- a/dabapush/Writer/Writer.py +++ b/dabapush/Writer/Writer.py @@ -6,8 +6,10 @@ """ import abc +from pathlib import Path from typing import Iterator, List +import ujson from loguru import logger as log from ..Configuration.WriterConfiguration import WriterConfiguration @@ -27,10 +29,31 @@ def __init__(self, config: WriterConfiguration): self.config = config self.buffer: List[Record] = [] + self.back_log: List[Record] = [] + # initialize file log + if not Path(".dabapush/").exists(): + Path(".dabapush/").mkdir() + + self.log_path = Path(f".dabapush/{config.name}.jsonl") + if self.log_path.exists(): + log.debug( + f"Found log file for {self.config.name} at {self.log_path}. Loading..." + ) + with self.log_path.open("rt", encoding="utf8") as f: + self.back_log = [ + Record(**ujson.loads(_)) # pylint: disable=I1101 + for _ in f.readlines() + ] + else: + self.log_path.touch() + self.log_file = self.log_path.open( # pylint: disable=R1732 + "a", encoding="utf8" + ) def __del__(self): """Ensures the buffer is flushed before the object is destroyed.""" self._trigger_persist() + self.log_file.close() def write(self, queue: Iterator[Record]) -> None: """Consumes items from the provided queue. @@ -39,16 +62,20 @@ def write(self, queue: Iterator[Record]) -> None: queue (Iterator[Record]): Items to be consumed. """ for item in queue: + if item in self.back_log: + continue self.buffer.append(item) if len(self.buffer) >= self.config.chunk_size: self._trigger_persist() def _trigger_persist(self): self.persist() - log.debug(f"Persisted {self.config.chunk_size} records. Setting to done.") + log.debug(f"Persisted {len(self.buffer)} records. Setting to done.") for record in self.buffer: log.debug(f"Setting record {record.uuid} as done.") record.done() + self.log(record) + self.log_file.flush() self.buffer = [] @abc.abstractmethod @@ -72,3 +99,14 @@ def id(self): str: The ID of the writer. """ return self.config.id + + def log(self, record: Record): + """Log the record to the persistent record log file.""" + ujson.dump(record.to_log(), self.log_file) # pylint: disable=I1101 + self.log_file.write("\n") + + # with self.log_path.open("a", encoding="utf8") as f: + # for sub_record in record.walk_tree(only_leafs=True): + # ujson.dump(sub_record.to_log(), f) + # f.write("\n") + log.debug(f"Done with {record.uuid}") diff --git a/tests/Reader/test_JSONReader.py b/tests/Reader/test_JSONReader.py index e4e74b8..797ccf3 100644 --- a/tests/Reader/test_JSONReader.py +++ b/tests/Reader/test_JSONReader.py @@ -7,16 +7,18 @@ from dabapush.Reader.JSONReader import JSONReader, JSONReaderConfiguration + @pytest.fixture def input_json_directory(isolated_test_dir): "Pytest fixture creating a directory with 20 json files." - for idx in range(10,30): + for idx in range(10, 30): file_path = isolated_test_dir / f"test_{idx}.json" with file_path.open("wt") as out_file: json.dump({"test_key": idx}, out_file) out_file.write("\n") return isolated_test_dir + def test_read(input_json_directory: Path): # pylint: disable=W0621 """Should read the data from the file.""" reader = JSONReader( @@ -28,30 +30,3 @@ def test_read(input_json_directory: Path): # pylint: disable=W0621 print(record) assert record.processed_at assert record.payload == {"test_key": int(record.uuid[-7:-5])} - - -def test_read_with_backlog(input_json_directory: Path): # pylint: disable=W0621 - """Should only read the new data.""" - reader = JSONReaderConfiguration( - "test", read_path=str(input_json_directory.resolve()), pattern="*.json" - ).get_instance() - - def wrapper(): - n = None - for n, record in enumerate(reader.read()): - record.done() - return n or 0 - - n = wrapper() - - assert n + 1 == 20 - - reader2 = JSONReaderConfiguration( - "test", read_path=str(input_json_directory.resolve()) - ).get_instance() - - records2 = list(reader2.read()) - log_path = input_json_directory / ".dabapush/test.jsonl" - assert log_path.exists() - assert len(reader2.back_log) == 20 - assert len(records2) == 0 diff --git a/tests/Reader/test_NDJSONReader.py b/tests/Reader/test_NDJSONReader.py index 124af67..d567b40 100644 --- a/tests/Reader/test_NDJSONReader.py +++ b/tests/Reader/test_NDJSONReader.py @@ -30,35 +30,3 @@ def test_read(isolated_test_dir: Path, data): # pylint: disable=W0621 for n, record in enumerate(records): assert record.processed_at assert record.payload == data[n] - - -def test_read_with_backlog(isolated_test_dir: Path, data): # pylint: disable=W0621 - """Should only read the new data.""" - reader = NDJSONReaderConfiguration( - "test", read_path=str(isolated_test_dir.resolve()), pattern="*.ndjson" - ).get_instance() - file_path = isolated_test_dir / "test.ndjson" - with file_path.open("wt") as file: - for line in data: - json.dump(line, file) - file.write("\n") - - def wrapper(): - n = None - for n, record in enumerate(reader.read()): - record.done() - return n or 0 - - n = wrapper() - - assert n + 1 == 20 - - reader2 = NDJSONReaderConfiguration( - "test", read_path=str(isolated_test_dir.resolve()) - ).get_instance() - - records2 = list(reader2.read()) - log_path = isolated_test_dir / ".dabapush/test.jsonl" - assert log_path.exists() - assert len(reader2.back_log) == 20 - assert len(records2) == 0 diff --git a/tests/Writer/test_Writer.py b/tests/Writer/test_Writer.py index 7852289..abb7298 100644 --- a/tests/Writer/test_Writer.py +++ b/tests/Writer/test_Writer.py @@ -1,6 +1,6 @@ """Tests for the Writer class.""" -# pylint: disable=W0621, C0114, C0115, C0116 +# pylint: disable=W0212, W0621, C0114, C0115, C0116 from pytest import fixture from dabapush.Configuration.WriterConfiguration import WriterConfiguration @@ -9,7 +9,8 @@ @fixture -def writer() -> Writer: +def writer(monkeypatch, tmp_path) -> Writer: + monkeypatch.chdir(tmp_path) config = WriterConfiguration(name="test") return Writer(config) @@ -36,15 +37,37 @@ def __init__(self, config): def persist(self): self.persisted_data.extend((_.payload for _ in self.buffer)) - self.buffer = [] -def test_writer_persist_method(): +def test_writer_persist_method(monkeypatch, tmp_path): """Should persist the buffer.""" + monkeypatch.chdir(tmp_path) + config = WriterConfiguration(name="test", id=1, chunk_size=3) writer = MyTestWriter(config) queue = (Record(uuid=str(i), payload=i) for i in range(10)) writer.write(queue) - writer.persist() + writer._trigger_persist() assert writer.persisted_data == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] assert not writer.buffer + + +def test_writer_persist_method_with_backlog(monkeypatch, tmp_path): + """Should persist the buffer.""" + monkeypatch.chdir(tmp_path) + + config = WriterConfiguration(name="test", id=1, chunk_size=3) + queue = [Record(uuid=str(i), payload=i) for i in range(10)] + writer = MyTestWriter(config) + writer.write((_ for _ in queue)) + writer._trigger_persist() + + del writer + + writer = MyTestWriter(config) + writer._trigger_persist() + + assert not writer.persisted_data + assert not writer.buffer + assert len(writer.back_log) == 10 + assert all(_.uuid == str(i) for i, _ in enumerate(writer.back_log)) From 38d4b046bbf1adba6426d1948cf0bafe099f92d4 Mon Sep 17 00:00:00 2001 From: Philipp Kessling Date: Wed, 22 Jan 2025 12:03:44 +0100 Subject: [PATCH 4/6] Use fixture to isolate test runs. --- tests/Writer/test_Writer.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tests/Writer/test_Writer.py b/tests/Writer/test_Writer.py index abb7298..6a6c188 100644 --- a/tests/Writer/test_Writer.py +++ b/tests/Writer/test_Writer.py @@ -1,6 +1,6 @@ """Tests for the Writer class.""" -# pylint: disable=W0212, W0621, C0114, C0115, C0116 +# pylint: disable=W0212, W0621, W0613, C0114, C0115, C0116 from pytest import fixture from dabapush.Configuration.WriterConfiguration import WriterConfiguration @@ -9,8 +9,8 @@ @fixture -def writer(monkeypatch, tmp_path) -> Writer: - monkeypatch.chdir(tmp_path) +def writer(isolated_test_dir) -> Writer: + config = WriterConfiguration(name="test") return Writer(config) @@ -39,9 +39,8 @@ def persist(self): self.persisted_data.extend((_.payload for _ in self.buffer)) -def test_writer_persist_method(monkeypatch, tmp_path): +def test_writer_persist_method(isolated_test_dir): """Should persist the buffer.""" - monkeypatch.chdir(tmp_path) config = WriterConfiguration(name="test", id=1, chunk_size=3) writer = MyTestWriter(config) @@ -52,9 +51,8 @@ def test_writer_persist_method(monkeypatch, tmp_path): assert not writer.buffer -def test_writer_persist_method_with_backlog(monkeypatch, tmp_path): +def test_writer_persist_method_with_backlog(isolated_test_dir): """Should persist the buffer.""" - monkeypatch.chdir(tmp_path) config = WriterConfiguration(name="test", id=1, chunk_size=3) queue = [Record(uuid=str(i), payload=i) for i in range(10)] From 66176e3c8c41eb50693a79e244ddaa4d5a0da331 Mon Sep 17 00:00:00 2001 From: Philipp Kessling Date: Wed, 22 Jan 2025 12:08:44 +0100 Subject: [PATCH 5/6] Remove commented code. --- dabapush/Reader/Reader.py | 1 - dabapush/Writer/Writer.py | 4 ---- 2 files changed, 5 deletions(-) diff --git a/dabapush/Reader/Reader.py b/dabapush/Reader/Reader.py index 94df09a..4fd2efe 100644 --- a/dabapush/Reader/Reader.py +++ b/dabapush/Reader/Reader.py @@ -73,7 +73,6 @@ def records(self) -> Iterator[Record]: Record( uuid=str(a), payload=a, - # event_handlers={"on_done": [self.log]}, ) for a in tqdm(list(Path(self.config.read_path).rglob(self.config.pattern))) ) diff --git a/dabapush/Writer/Writer.py b/dabapush/Writer/Writer.py index 7baf827..22e4e74 100644 --- a/dabapush/Writer/Writer.py +++ b/dabapush/Writer/Writer.py @@ -105,8 +105,4 @@ def log(self, record: Record): ujson.dump(record.to_log(), self.log_file) # pylint: disable=I1101 self.log_file.write("\n") - # with self.log_path.open("a", encoding="utf8") as f: - # for sub_record in record.walk_tree(only_leafs=True): - # ujson.dump(sub_record.to_log(), f) - # f.write("\n") log.debug(f"Done with {record.uuid}") From db303b13e5fe559616f8e5f72402c13a22be1c72 Mon Sep 17 00:00:00 2001 From: Philipp Kessling Date: Wed, 22 Jan 2025 12:25:46 +0100 Subject: [PATCH 6/6] Do not signal parent records. --- dabapush/Record.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/dabapush/Record.py b/dabapush/Record.py index d4a3273..5c789ab 100644 --- a/dabapush/Record.py +++ b/dabapush/Record.py @@ -185,19 +185,6 @@ def done(self): # Signal parent that this record is done self._state_ = "done" log.debug(f"Record {self.uuid} is set as done.") - # if self.source: - # self.source.signal_done() - # log.debug(f"Signaled parent {self.source.uuid} of record {self.uuid}.") - # self.__dispatch_event__("on_done") - - def signal_done(self): - """Signal that a child record is done.""" - # If all children are done, so is the parent. - _children_status_ = [child.state == "done" for child in self.children] - log.debug(f"Signaled that children of {self.uuid} is done.") - if all(_children_status_): - self.done() - log.debug(f"Record {self.uuid} is done.") def destroy(self): """Destroy the record and all its children."""