Skip to content

Commit

Permalink
Use asyncio.tothread to avoid blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Jun 12, 2024
1 parent e3632fd commit 6960819
Showing 1 changed file with 42 additions and 28 deletions.
70 changes: 42 additions & 28 deletions odds/backend/processor/resource_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,44 @@ def func(rows):
yield row
return func


def validate_data(self, ctx, filename, stream):
dp, _ = DF.Flow(
DF.load(filename, override_schema={'missingValues': self.MISSING_VALUES}, deduplicate_headers=True),
DF.update_resource(-1, name='data'),
DF.validate(on_error=DF.schema_validator.clear),
self.updater(ctx, lambda i: f'LOADED {i} ROWS TO DISK'),
DF.stream(stream)
).process()
rts.set(ctx, f'VALIDATED {filename} to {stream.name}')
return dp

def load_sample(self, ctx, stream_name):
data = DF.Flow(
DF.unstream(stream_name),
self.limiter(),
).results(on_error=None)[0][0]
rts.set(ctx, f'READ DATA {len(data)} ROWS from {stream_name}')
return data

def write_db(self, ctx, sqlite_filename, stream_name, data, resource):
sqlite_url = f'sqlite:///{sqlite_filename}'
engine = create_engine(sqlite_url)
DF.Flow(
DF.unstream(stream_name),
self.updater(ctx, lambda i: f'DUMPED {i} ROWS TO SQLITE'),
DF.dump_to_sql({'data': {'resource-name': 'data'}}, engine=engine),
).process()
rts.set(ctx, f'DUMPED {len(data)} ROWS from {resource.url} TO {sqlite_filename}')
with engine.connect() as conn:
# row count:
resource.row_count = conn.execute(text('SELECT COUNT(*) FROM data')).fetchone()[0]
# get the table's CREATE TABLE text:
resource.db_schema = conn.execute(text('SELECT sql FROM sqlite_master WHERE type="table" AND name="data"')).fetchone()[0]
resource.status_loaded = True
rts.set(ctx, f'SQLITE DATA {resource.url} HAS {resource.row_count} ROWS == {len(data)} ROWS')
return resource

async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatalog, ctx: str):
if not ResourceProcessor.check_format(resource):
return None
Expand Down Expand Up @@ -93,25 +131,14 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal
rts.set(ctx, f'DOWNLOADED {report} BYTES from {resource.url} to {filename}')

rts.set(ctx, f'DOWNLOADED {total_size} BYTES from {resource.url} to {filename}')
dp, _ = DF.Flow(
DF.load(filename, override_schema={'missingValues': self.MISSING_VALUES}, deduplicate_headers=True),
DF.update_resource(-1, name='data'),
DF.validate(on_error=DF.schema_validator.clear),
self.updater(ctx, lambda i: f'LOADED {i} ROWS TO DISK'),
DF.stream(stream)
).process()
rts.set(ctx, f'VALIDATED {total_size} BYTES from {resource.url} to {stream.name}')
dp = await asyncio.to_thread(self.load_data, ctx, filename, stream)
potential_fields = [
Field(name=field['name'], data_type=field['type'])
for field in
dp.resources[0].descriptor['schema']['fields']
]

data = DF.Flow(
DF.unstream(stream.name),
self.limiter(),
).results(on_error=None)[0][0]
rts.set(ctx, f'READ DATA {total_size} BYTES / {len(data)} ROWS from {resource.url}')
data = await asyncio.to_thread(self.load_sample, ctx, stream.name)

if len(data) == 0:
resource.loading_error = 'NO DATA'
Expand Down Expand Up @@ -145,23 +172,10 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal
return

stream.close()

sqlite_filename = f'{TMP_DIR}/{rand}.sqlite'
to_delete.append(sqlite_filename)
sqlite_url = f'sqlite:///{sqlite_filename}'
engine = create_engine(sqlite_url)
DF.Flow(
DF.unstream(stream.name),
self.updater(ctx, lambda i: f'DUMPED {i} ROWS TO SQLITE'),
DF.dump_to_sql({'data': {'resource-name': 'data'}}, engine=engine),
).process()
rts.set(ctx, f'DUMPED {total_size} BYTES / {len(data)} ROWS from {resource.url} TO {sqlite_filename}')
with engine.connect() as conn:
# row count:
resource.row_count = conn.execute(text('SELECT COUNT(*) FROM data')).fetchone()[0]
# get the table's CREATE TABLE text:
resource.db_schema = conn.execute(text('SELECT sql FROM sqlite_master WHERE type="table" AND name="data"')).fetchone()[0]
resource.status_loaded = True
rts.set(ctx, f'SQLITE DATA {resource.url} HAS {resource.row_count} ROWS == {len(data)} ROWS')
resource = await asyncio.to_thread(self.write_db, ctx, sqlite_filename, stream.name, data, resource)
await store.storeDB(resource, dataset, sqlite_filename, ctx)

except Exception as e:
Expand Down

0 comments on commit 6960819

Please sign in to comment.