diff --git a/opendata_module/opmon_opendata/api/helpers.py b/opendata_module/opmon_opendata/api/helpers.py index dfff756..cccdeff 100644 --- a/opendata_module/opmon_opendata/api/helpers.py +++ b/opendata_module/opmon_opendata/api/helpers.py @@ -60,13 +60,15 @@ def generate_ndjson_stream(postgres: PostgreSQL_LogManager, date: datetime, with GzipFile(fileobj=gzip_buffer, mode='wb') as gzip_file: count = 0 buffer_size = settings['opendata'].get('stream-buffer-lines', DEFAULT_STREAM_BUFFER_LINES) - for row in data_cursor: + gzip_file.write(b'[\n') + for idx, row in enumerate(data_cursor): + if idx > 0: + gzip_file.write(b',\n') json_obj = {column_name: row[column_idx] for column_idx, column_name in enumerate(column_names)} # Must manually convert Postgres dates to string to be compatible with JSON format for date_column in date_columns: json_obj[date_column] = datetime.strftime(json_obj[date_column], '%Y-%m-%d') gzip_file.write(bytes(json.dumps(json_obj), 'utf-8')) - gzip_file.write(b'\n') count += 1 if count == buffer_size: count = 0 @@ -78,6 +80,8 @@ def generate_ndjson_stream(postgres: PostgreSQL_LogManager, date: datetime, # Empty buffer to free memory gzip_buffer.truncate(0) gzip_buffer.seek(0) + gzip_file.write(b'\n]') + # Final data gets written when GzipFile is closed yield gzip_buffer.getvalue() diff --git a/opendata_module/opmon_opendata/tests/test_helpers.py b/opendata_module/opmon_opendata/tests/test_helpers.py new file mode 100644 index 0000000..79b885c --- /dev/null +++ b/opendata_module/opmon_opendata/tests/test_helpers.py @@ -0,0 +1,72 @@ +# +# The MIT License +# Copyright (c) 2021- Nordic Institute for Interoperability Solutions (NIIS) +# Copyright (c) 2017-2020 Estonian Information System Authority (RIA) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +import gzip +import json +import io +import pytest + +from datetime import date, datetime +from opmon_opendata.api import helpers + + +SETTINGS = { + 'opendata': {} +} + + +@pytest.fixture +def postgres(mocker): + return mocker.Mock() + + +@pytest.mark.parametrize( + "column_types, data_cursor, expected_output", + [ + # Test case: Multiple results + ( + [('foo', 'integer'), ('bar', 'character varying'), ('baz', 'date')], + [('1', 'aaa', date(2025, 1, 1)), ('2', 'bbb', date(2025, 2, 2))], + [{'foo': '1', 'bar': 'aaa', 'baz': '2025-01-01'}, + {'foo': '2', 'bar': 'bbb', 'baz': '2025-02-02'}] + ), + # Test case: No results + ( + [('foo', 'integer')], + [], + [] + ) + ] +) +def test_generate_ndjson_stream_creates_correct_json(postgres, column_types, data_cursor, expected_output): + postgres.get_column_names_and_types.return_value = column_types + postgres.get_data_cursor.return_value = data_cursor + gzipped_file_stream = helpers.generate_ndjson_stream(postgres, datetime.now(), [], [], [], SETTINGS) + decompressed_data = decompress_gzip(gzipped_file_stream) + assert json.loads(decompressed_data) == expected_output + +def decompress_gzip(gzipped_file_stream): + compressed_data = b"".join(gzipped_file_stream) + with gzip.GzipFile(fileobj=io.BytesIO(compressed_data), mode="rb") as f: + decompressed_data = f.read().decode() + return decompressed_data