Skip to content

Commit

Permalink
Add http timeout + max fields processing
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Jun 13, 2024
1 parent f2f63f1 commit dffcbfe
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions odds/backend/processor/resource_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ResourceProcessor:
ALLOWED_FORMATS = ['csv', 'xlsx', 'xls']
MISSING_VALUES = ['None', 'NULL', 'N/A', 'NA', 'NAN', 'NaN', 'nan', '-']
BIG_FILE_SIZE = 10000000
MAX_FIELDS = 1000

@staticmethod
def check_format(resource: Resource):
Expand Down Expand Up @@ -54,7 +55,7 @@ def func(rows):

def validate_data(self, ctx, filename, stream):
dp, _ = DF.Flow(
DF.load(filename, override_schema={'missingValues': self.MISSING_VALUES}, deduplicate_headers=True),
DF.load(filename, override_schema={'missingValues': self.MISSING_VALUES}, deduplicate_headers=True, http_timeout=60),
DF.update_resource(-1, name='data'),
DF.validate(on_error=DF.schema_validator.clear),
self.updater(ctx, lambda i: f'LOADED {i} ROWS TO DISK'),
Expand Down Expand Up @@ -87,25 +88,30 @@ def write_db(self, ctx, sqlite_filename, stream_name, data, resource, field_name
# get the table's CREATE TABLE text:
resource.db_schema = conn.execute(text('SELECT sql FROM sqlite_master WHERE type="table" AND name="data"')).fetchone()[0]
resource.status_loaded = True
rts.set(ctx, f'SQLITE DATA {resource.url} HAS {resource.row_count} ROWS == {len(data)} ROWS')
rts.set(ctx, f'SQLITE DATA {resource.url} HAS {resource.row_count} ROWS')
return resource

async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatalog, ctx: str):
if not ResourceProcessor.check_format(resource):
return None
if not resource.url:
return None
if not self.sem:
self.sem = asyncio.Semaphore(self.concurrency_limit)
dataset.versions['resource_analyzer'] = config.feature_versions.resource_analyzer
if resource.status_loaded and not resource.loading_error:
resource.loading_error = None
return None
if resource.loading_error:
if resource.loading_error.startswith('TOO MANY FIELDS'):
return None
past_num_fields = resource.loading_error.split('-')[-1]
past_num_fields = int(past_num_fields)
if past_num_fields > self.MAX_FIELDS:
return None
resource.status_selected = True
resource.loading_error = None
resource.status_loaded = False
to_delete = []
if not self.sem:
self.sem = asyncio.Semaphore(self.concurrency_limit)
try:
async with self.sem:
rand = uuid.uuid4().hex
Expand Down Expand Up @@ -176,8 +182,8 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal
field.min_value = str(min(true_values))
except:
pass
if len(field_names) > 50:
resource.loading_error = f'TOO MANY FIELDS - {len(resource.fields)}'
if len(field_names) > self.MAX_FIELDS:
resource.loading_error = f'TOO MANY FIELDS - {len(field_names)}'
rts.set(ctx, f'SKIPPING {resource.url} TOO MANY FIELDS')
return

Expand All @@ -195,7 +201,6 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal
finally:
rts.clear(ctx)

dataset.versions['resource_analyzer'] = config.feature_versions.resource_analyzer
finally:
for filename in to_delete:
try:
Expand Down

0 comments on commit dffcbfe

Please sign in to comment.