diff --git a/odds/backend/processor/resource_processor.py b/odds/backend/processor/resource_processor.py index c9ea966..b14e038 100644 --- a/odds/backend/processor/resource_processor.py +++ b/odds/backend/processor/resource_processor.py @@ -71,11 +71,12 @@ def load_sample(self, ctx, stream_name): rts.set(ctx, f'READ DATA {len(data)} ROWS from {stream_name}') return data - def write_db(self, ctx, sqlite_filename, stream_name, data, resource): + def write_db(self, ctx, sqlite_filename, stream_name, data, resource, field_names): sqlite_url = f'sqlite:///{sqlite_filename}' engine = create_engine(sqlite_url) DF.Flow( DF.unstream(stream_name), + DF.select_fields(field_names), self.updater(ctx, lambda i: f'DUMPED {i} ROWS TO SQLITE'), DF.dump_to_sql({'data': {'resource-name': 'data'}}, engine=engine), ).process() @@ -98,6 +99,9 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal self.sem = asyncio.Semaphore(self.concurrency_limit) if resource.status_loaded and not resource.loading_error: return None + if resource.loading_error: + if resource.loading_error.startswith('TOO MANY FIELDS'): + return None resource.status_selected = True resource.loading_error = None resource.status_loaded = False @@ -146,6 +150,7 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal return resource.fields = [] + field_names = [] for field in potential_fields: col_name = field.name @@ -154,19 +159,21 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal if len(true_values) == 0: continue resource.fields.append(field) + field_names.append(col_name) try: - field.sample_values = [str(x) for x, _ in Counter(values).most_common(10)] + field.sample_values = [str(x) for x, _ in Counter(true_values).most_common(10)] except: pass if len(values) > 0: field.missing_values_percent = int(100 * (len(values) - len(true_values)) / len(values)) if field.data_type in ('number', 'integer', 'date', 'time', 'datetime'): + true_values = set(true_values) try: field.max_value = str(max(true_values)) field.min_value = str(min(true_values)) except: pass - if len(resource.fields) > 50: + if len(field_names) > 50: resource.loading_error = f'TOO MANY FIELDS - {len(resource.fields)}' rts.set(ctx, f'SKIPPING {resource.url} TOO MANY FIELDS') return @@ -175,7 +182,7 @@ async def process(self, resource: Resource, dataset: Dataset, catalog: DataCatal sqlite_filename = f'{TMP_DIR}/{rand}.sqlite' to_delete.append(sqlite_filename) - resource = await asyncio.to_thread(self.write_db, ctx, sqlite_filename, stream.name, data, resource) + resource = await asyncio.to_thread(self.write_db, ctx, sqlite_filename, stream.name, data, resource, field_names) await store.storeDB(resource, dataset, sqlite_filename, ctx) except Exception as e: