From dffcbfe6d9606a806d6b772304018b684d6103f8 Mon Sep 17 00:00:00 2001 From: Adam Kariv Date: Thu, 13 Jun 2024 14:28:36 +0300 Subject: [PATCH] Add http timeout + max fields processing --- odds/backend/processor/resource_processor.py | 21 ++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/odds/backend/processor/resource_processor.py b/odds/backend/processor/resource_processor.py index bd9b766..f4c0998 100644 --- a/odds/backend/processor/resource_processor.py +++ b/odds/backend/processor/resource_processor.py @@ -25,6 +25,7 @@ class ResourceProcessor: ALLOWED_FORMATS = ['csv', 'xlsx', 'xls'] MISSING_VALUES = ['None', 'NULL', 'N/A', 'NA', 'NAN', 'NaN', 'nan', '-'] BIG_FILE_SIZE = 10000000 + MAX_FIELDS = 1000 @staticmethod def check_format(resource: Resource): @@ -54,7 +55,7 @@ def func(rows): def validate_data(self, ctx, filename, stream): dp, _ = DF.Flow( - DF.load(filename, override_schema={'missingValues': self.MISSING_VALUES}, deduplicate_headers=True), + DF.load(filename, override_schema={'missingValues': self.MISSING_VALUES}, deduplicate_headers=True, http_timeout=60), DF.update_resource(-1, name='data'), DF.validate(on_error=DF.schema_validator.clear), self.updater(ctx, lambda i: f'LOADED {i} ROWS TO DISK'), @@ -87,7 +88,7 @@ def write_db(self, ctx, sqlite_filename, stream_name, data, resource, field_name # get the table's CREATE TABLE text: resource.db_schema = conn.execute(text('SELECT sql FROM sqlite_master WHERE type="table" AND name="data"')).fetchone()[0] resource.status_loaded = True - rts.set(ctx, f'SQLITE DATA {resource.url} HAS {resource.row_count} ROWS == {len(data)} ROWS') + rts.set(ctx, f'SQLITE DATA {resource.url} HAS {resource.row_count} ROWS') return resource async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatalog, ctx: str): @@ -95,17 +96,22 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal return None if not resource.url: return None - if not self.sem: - self.sem = asyncio.Semaphore(self.concurrency_limit) + dataset.versions['resource_analyzer'] = config.feature_versions.resource_analyzer if resource.status_loaded and not resource.loading_error: + resource.loading_error = None return None if resource.loading_error: if resource.loading_error.startswith('TOO MANY FIELDS'): - return None + past_num_fields = resource.loading_error.split('-')[-1] + past_num_fields = int(past_num_fields) + if past_num_fields > self.MAX_FIELDS: + return None resource.status_selected = True resource.loading_error = None resource.status_loaded = False to_delete = [] + if not self.sem: + self.sem = asyncio.Semaphore(self.concurrency_limit) try: async with self.sem: rand = uuid.uuid4().hex @@ -176,8 +182,8 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal field.min_value = str(min(true_values)) except: pass - if len(field_names) > 50: - resource.loading_error = f'TOO MANY FIELDS - {len(resource.fields)}' + if len(field_names) > self.MAX_FIELDS: + resource.loading_error = f'TOO MANY FIELDS - {len(field_names)}' rts.set(ctx, f'SKIPPING {resource.url} TOO MANY FIELDS') return @@ -195,7 +201,6 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal finally: rts.clear(ctx) - dataset.versions['resource_analyzer'] = config.feature_versions.resource_analyzer finally: for filename in to_delete: try: