From 6960819f1c851102a8db51f5fa88f8b571bda1d0 Mon Sep 17 00:00:00 2001 From: Adam Kariv Date: Wed, 12 Jun 2024 11:25:38 +0300 Subject: [PATCH] Use asyncio.tothread to avoid blocking --- odds/backend/processor/resource_processor.py | 70 ++++++++++++-------- 1 file changed, 42 insertions(+), 28 deletions(-) diff --git a/odds/backend/processor/resource_processor.py b/odds/backend/processor/resource_processor.py index 82f36dd..4d181fa 100644 --- a/odds/backend/processor/resource_processor.py +++ b/odds/backend/processor/resource_processor.py @@ -51,6 +51,44 @@ def func(rows): yield row return func + + def validate_data(self, ctx, filename, stream): + dp, _ = DF.Flow( + DF.load(filename, override_schema={'missingValues': self.MISSING_VALUES}, deduplicate_headers=True), + DF.update_resource(-1, name='data'), + DF.validate(on_error=DF.schema_validator.clear), + self.updater(ctx, lambda i: f'LOADED {i} ROWS TO DISK'), + DF.stream(stream) + ).process() + rts.set(ctx, f'VALIDATED {filename} to {stream.name}') + return dp + + def load_sample(self, ctx, stream_name): + data = DF.Flow( + DF.unstream(stream_name), + self.limiter(), + ).results(on_error=None)[0][0] + rts.set(ctx, f'READ DATA {len(data)} ROWS from {stream_name}') + return data + + def write_db(self, ctx, sqlite_filename, stream_name, data, resource): + sqlite_url = f'sqlite:///{sqlite_filename}' + engine = create_engine(sqlite_url) + DF.Flow( + DF.unstream(stream_name), + self.updater(ctx, lambda i: f'DUMPED {i} ROWS TO SQLITE'), + DF.dump_to_sql({'data': {'resource-name': 'data'}}, engine=engine), + ).process() + rts.set(ctx, f'DUMPED {len(data)} ROWS from {resource.url} TO {sqlite_filename}') + with engine.connect() as conn: + # row count: + resource.row_count = conn.execute(text('SELECT COUNT(*) FROM data')).fetchone()[0] + # get the table's CREATE TABLE text: + resource.db_schema = conn.execute(text('SELECT sql FROM sqlite_master WHERE type="table" AND name="data"')).fetchone()[0] + resource.status_loaded = True + rts.set(ctx, f'SQLITE DATA {resource.url} HAS {resource.row_count} ROWS == {len(data)} ROWS') + return resource + async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatalog, ctx: str): if not ResourceProcessor.check_format(resource): return None @@ -93,25 +131,14 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal rts.set(ctx, f'DOWNLOADED {report} BYTES from {resource.url} to {filename}') rts.set(ctx, f'DOWNLOADED {total_size} BYTES from {resource.url} to {filename}') - dp, _ = DF.Flow( - DF.load(filename, override_schema={'missingValues': self.MISSING_VALUES}, deduplicate_headers=True), - DF.update_resource(-1, name='data'), - DF.validate(on_error=DF.schema_validator.clear), - self.updater(ctx, lambda i: f'LOADED {i} ROWS TO DISK'), - DF.stream(stream) - ).process() - rts.set(ctx, f'VALIDATED {total_size} BYTES from {resource.url} to {stream.name}') + dp = await asyncio.to_thread(self.load_data, ctx, filename, stream) potential_fields = [ Field(name=field['name'], data_type=field['type']) for field in dp.resources[0].descriptor['schema']['fields'] ] - data = DF.Flow( - DF.unstream(stream.name), - self.limiter(), - ).results(on_error=None)[0][0] - rts.set(ctx, f'READ DATA {total_size} BYTES / {len(data)} ROWS from {resource.url}') + data = await asyncio.to_thread(self.load_sample, ctx, stream.name) if len(data) == 0: resource.loading_error = 'NO DATA' @@ -145,23 +172,10 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal return stream.close() + sqlite_filename = f'{TMP_DIR}/{rand}.sqlite' to_delete.append(sqlite_filename) - sqlite_url = f'sqlite:///{sqlite_filename}' - engine = create_engine(sqlite_url) - DF.Flow( - DF.unstream(stream.name), - self.updater(ctx, lambda i: f'DUMPED {i} ROWS TO SQLITE'), - DF.dump_to_sql({'data': {'resource-name': 'data'}}, engine=engine), - ).process() - rts.set(ctx, f'DUMPED {total_size} BYTES / {len(data)} ROWS from {resource.url} TO {sqlite_filename}') - with engine.connect() as conn: - # row count: - resource.row_count = conn.execute(text('SELECT COUNT(*) FROM data')).fetchone()[0] - # get the table's CREATE TABLE text: - resource.db_schema = conn.execute(text('SELECT sql FROM sqlite_master WHERE type="table" AND name="data"')).fetchone()[0] - resource.status_loaded = True - rts.set(ctx, f'SQLITE DATA {resource.url} HAS {resource.row_count} ROWS == {len(data)} ROWS') + resource = await asyncio.to_thread(self.write_db, ctx, sqlite_filename, stream.name, data, resource) await store.storeDB(resource, dataset, sqlite_filename, ctx) except Exception as e: