Skip to content

Commit

Permalink
Slight improvements to resource processing
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Jun 12, 2024
1 parent 4ff5302 commit bd1c36d
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions odds/backend/processor/resource_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ def load_sample(self, ctx, stream_name):
rts.set(ctx, f'READ DATA {len(data)} ROWS from {stream_name}')
return data

def write_db(self, ctx, sqlite_filename, stream_name, data, resource):
def write_db(self, ctx, sqlite_filename, stream_name, data, resource, field_names):
sqlite_url = f'sqlite:///{sqlite_filename}'
engine = create_engine(sqlite_url)
DF.Flow(
DF.unstream(stream_name),
DF.select_fields(field_names),
self.updater(ctx, lambda i: f'DUMPED {i} ROWS TO SQLITE'),
DF.dump_to_sql({'data': {'resource-name': 'data'}}, engine=engine),
).process()
Expand All @@ -98,6 +99,9 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal
self.sem = asyncio.Semaphore(self.concurrency_limit)
if resource.status_loaded and not resource.loading_error:
return None
if resource.loading_error:
if resource.loading_error.startswith('TOO MANY FIELDS'):
return None
resource.status_selected = True
resource.loading_error = None
resource.status_loaded = False
Expand Down Expand Up @@ -146,6 +150,7 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal
return

resource.fields = []
field_names = []
for field in potential_fields:
col_name = field.name

Expand All @@ -154,19 +159,21 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal
if len(true_values) == 0:
continue
resource.fields.append(field)
field_names.append(col_name)
try:
field.sample_values = [str(x) for x, _ in Counter(values).most_common(10)]
field.sample_values = [str(x) for x, _ in Counter(true_values).most_common(10)]
except:
pass
if len(values) > 0:
field.missing_values_percent = int(100 * (len(values) - len(true_values)) / len(values))
if field.data_type in ('number', 'integer', 'date', 'time', 'datetime'):
true_values = set(true_values)
try:
field.max_value = str(max(true_values))
field.min_value = str(min(true_values))
except:
pass
if len(resource.fields) > 50:
if len(field_names) > 50:
resource.loading_error = f'TOO MANY FIELDS - {len(resource.fields)}'
rts.set(ctx, f'SKIPPING {resource.url} TOO MANY FIELDS')
return
Expand All @@ -175,7 +182,7 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal

sqlite_filename = f'{TMP_DIR}/{rand}.sqlite'
to_delete.append(sqlite_filename)
resource = await asyncio.to_thread(self.write_db, ctx, sqlite_filename, stream.name, data, resource)
resource = await asyncio.to_thread(self.write_db, ctx, sqlite_filename, stream.name, data, resource, field_names)
await store.storeDB(resource, dataset, sqlite_filename, ctx)

except Exception as e:
Expand Down

0 comments on commit bd1c36d

Please sign in to comment.