Skip to content

Commit

Permalink
Merge pull request #55 from statisticsnorway/STAT-16-GCS-datasets
Browse files Browse the repository at this point in the history
Compatibility with datasets in GCS buckets
  • Loading branch information
mmwinther authored Sep 2, 2022
2 parents 284f9e0 + 6c4eb7c commit a38f7b7
Show file tree
Hide file tree
Showing 11 changed files with 1,681 additions and 250 deletions.
2 changes: 1 addition & 1 deletion datadoc/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def main(dataset_path: str = None):
# Assume running in server mode is better (largely for development purposes)
logger.debug("Starting in development mode")
app = build_app(Dash)
app.run(debug=True)
app.run(debug=True, use_reloader=False)


if __name__ == "__main__":
Expand Down
106 changes: 59 additions & 47 deletions datadoc/backend/DataDocMetadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import pathlib
from typing import Dict, Optional

from datadoc_model import Model
from datadoc_model.Enums import DatasetState

import datadoc.frontend.fields.DisplayDataset as DisplayDataset
import datadoc.frontend.fields.DisplayVariables as DisplayVariables
from datadoc.backend.DatasetReader import DatasetReader
from datadoc.backend.DatasetParser import DatasetParser
from datadoc.backend.StorageAdapter import StorageAdapter
from datadoc.utils import calculate_percentage
from datadoc_model import Model
from datadoc_model.Enums import DatasetState

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -38,21 +40,20 @@
]
)

METADATA_DOCUMENT_FILE_SUFFIX = "__DOC.json"


class DataDocMetadata:
def __init__(self, dataset):
self.dataset = dataset
self.dataset_full_path = pathlib.Path(self.dataset)
self.dataset_directory = self.dataset_full_path.resolve().parent
self.dataset_stem = self.dataset_full_path.stem # filename without file ending
self.metadata_document_name = str(self.dataset_stem) + "__DOC.json"
self.metadata_document_full_path = self.dataset_directory.joinpath(
self.metadata_document_name
)
self.dataset_state: DatasetState = self.get_dataset_state(
self.dataset_full_path
self.dataset: str = dataset
self.short_name: str = pathlib.Path(
self.dataset
).stem # filename without file ending
self.metadata_document: StorageAdapter = StorageAdapter.for_path(
StorageAdapter.for_path(self.dataset).parent()
)
self.dataset_version = self.get_dataset_version(self.dataset_stem)
self.metadata_document.joinpath(self.short_name + METADATA_DOCUMENT_FILE_SUFFIX)
self.dataset_state: DatasetState = self.get_dataset_state(self.dataset)
try:
self.current_user = os.environ["JUPYTERHUB_USER"]
except KeyError:
Expand All @@ -73,20 +74,23 @@ def __init__(self, dataset):

self.read_metadata_document()

def get_dataset_state(
self, dataset_path: pathlib.Path = None
) -> Optional[DatasetState]:
def get_dataset_state(self, dataset: str) -> Optional[DatasetState]:
"""Use the path to attempt to guess the state of the dataset"""

if dataset_path is None:
dataset_path = self.dataset_full_path
dataset_path_parts = list(dataset_path.parts)
if "kildedata" in dataset_path_parts:
if dataset is None:
return None
dataset_path_parts = list(pathlib.Path(dataset).parts)
if "utdata" in dataset_path_parts:
return DatasetState.OUTPUT_DATA
elif "statistikk" in dataset_path_parts:
return DatasetState.STATISTIC
elif "klargjorte-data" in dataset_path_parts:
return DatasetState.PROCESSED_DATA
elif "klargjorte_data" in dataset_path_parts:
return DatasetState.PROCESSED_DATA
elif "kildedata" in dataset_path_parts:
return DatasetState.SOURCE_DATA
elif "inndata" in dataset_path_parts:
return DatasetState.INPUT_DATA
elif "klargjorte_data" in dataset_path_parts:
return DatasetState.PROCESSED_DATA
else:
return None

Expand All @@ -106,32 +110,42 @@ def get_dataset_version(self, dataset_stem: str) -> Optional[str]:

def read_metadata_document(self):
fresh_metadata = {}
if self.metadata_document_full_path.exists():
with open(self.metadata_document_full_path, encoding="utf-8") as file:
fresh_metadata = json.load(file)
logger.info(
f"Opened existing metadata file {self.metadata_document_full_path}"
)

variables_list = fresh_metadata.pop("variables", None)

self.meta.variables = [Model.DataDocVariable(**v) for v in variables_list]
self.meta.dataset = Model.DataDocDataSet(
**fresh_metadata.pop("dataset", None)
)
if self.metadata_document.exists():
try:
with self.metadata_document.open(mode="r", encoding="utf-8") as file:
fresh_metadata = json.load(file)
logger.info(
f"Opened existing metadata file {self.metadata_document.location}"
)

variables_list = fresh_metadata.pop("variables", None)

self.meta.variables = [
Model.DataDocVariable(**v) for v in variables_list
]
self.meta.dataset = Model.DataDocDataSet(
**fresh_metadata.pop("dataset", None)
)
except json.JSONDecodeError:
logger.warning(
f"Could not open existing metadata file {self.metadata_document.location}. \
Falling back to collecting data from the dataset",
exc_info=True,
)
self.extract_metadata_from_dataset()
else:
self.generate_new_metadata_document()
self.extract_metadata_from_dataset()

self.variables_lookup = {v.short_name: v for v in self.meta.variables}

def generate_new_metadata_document(self):
self.ds_schema = DatasetReader.for_file(self.dataset)
def extract_metadata_from_dataset(self):
self.ds_schema = DatasetParser.for_file(self.dataset)

self.meta.dataset = Model.DataDocDataSet(
short_name=self.dataset_stem,
short_name=self.short_name,
dataset_state=self.dataset_state,
version=self.dataset_version,
data_source_path=str(self.dataset_full_path),
version=self.get_dataset_version(self.short_name),
data_source_path=self.dataset,
created_date=self.current_datetime,
created_by=self.current_user,
)
Expand All @@ -140,10 +154,8 @@ def generate_new_metadata_document(self):

def write_metadata_document(self) -> None:
"""Write all currently known metadata to file"""
self.metadata_document_full_path.write_text(
self.meta.json(indent=4, sort_keys=False), encoding="utf-8"
)
logger.info(f"Saved metadata document {self.metadata_document_full_path}")
self.metadata_document.write_text(self.meta.json(indent=4, sort_keys=False))
logger.info(f"Saved metadata document {self.metadata_document.location}")

@property
def percent_complete(self) -> int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@

import pandas as pd
import pyarrow.parquet as pq
from datadoc import state
from datadoc_model.Enums import Datatype
from datadoc_model.LanguageStrings import LanguageStrings
from datadoc_model.Model import DataDocVariable

TDatasetReader = TypeVar("TDatasetReader", bound="DatasetReader")
from datadoc import state
from datadoc.backend.StorageAdapter import StorageAdapter

TDatasetParser = TypeVar("TDatasetParser", bound="DatasetParser")

KNOWN_INTEGER_TYPES = (
"int",
Expand Down Expand Up @@ -65,19 +68,20 @@
KNOWN_BOOLEAN_TYPES = ("bool", "bool_", "boolean")


class DatasetReader(ABC):
def __init__(self, dataset):
self.dataset = dataset
class DatasetParser(ABC):
def __init__(self, dataset: str):
self.dataset: StorageAdapter = StorageAdapter.for_path(dataset)

@staticmethod
def for_file(dataset: str) -> TDatasetReader:
def for_file(dataset: str) -> TDatasetParser:
"""Factory method to return the correct subclass based on the given dataset file"""
supported_file_types = {
"parquet": DatasetReaderParquet,
"sas7bdat": DatasetReaderSas7bdat,
"parquet": DatasetParserParquet,
"sas7bdat": DatasetParserSas7Bdat,
}
file_type = "Unknown"
try:
file_type = str(pathlib.Path(dataset)).lower().split(".")[1]
file_type = str(pathlib.Path(dataset)).lower().split(".")[-1]
# Extract the appropriate reader class from the SUPPORTED_FILE_TYPES dict and return an instance of it
reader = supported_file_types[file_type](dataset)
except IndexError as e:
Expand All @@ -86,7 +90,7 @@ def for_file(dataset: str) -> TDatasetReader:
f"Could not recognise file type for provided {dataset = }. Supported file types are: {', '.join(supported_file_types.keys())}"
) from e
except KeyError as e:
# In this case the file type is not supported so we throw a helpful exception
# In this case the file type is not supported, so we throw a helpful exception
raise NotImplementedError(
f"{file_type = } is not supported. Please open one of the following supported files types: {', '.join(supported_file_types.keys())} or contact the maintainers to request support."
) from e
Expand Down Expand Up @@ -116,34 +120,36 @@ def get_fields(self) -> List[DataDocVariable]:
"""Abstract method, must be implemented by subclasses"""


class DatasetReaderParquet(DatasetReader):
def __init__(self, dataset):
class DatasetParserParquet(DatasetParser):
def __init__(self, dataset: str):
super().__init__(dataset)

def get_fields(self) -> List[DataDocVariable]:
fields = []
data_table = pq.read_table(self.dataset)
for data_field in data_table.schema:
fields.append(
DataDocVariable(
short_name=data_field.name,
data_type=self.transform_data_type(str(data_field.type)),
with self.dataset.open(mode="rb") as f:
data_table = pq.read_table(f)
for data_field in data_table.schema:
fields.append(
DataDocVariable(
short_name=data_field.name,
data_type=self.transform_data_type(str(data_field.type)),
)
)
)
return fields


class DatasetReaderSas7bdat(DatasetReader):
def __init__(self, dataset):
class DatasetParserSas7Bdat(DatasetParser):
def __init__(self, dataset: str):
super().__init__(dataset)

def get_fields(self) -> List[DataDocVariable]:
fields = []
# Use an iterator to avoid reading in the entire dataset
sas_reader = pd.read_sas(self.dataset, iterator=True)
with self.dataset.open(mode="rb") as f:
# Use an iterator to avoid reading in the entire dataset
sas_reader = pd.read_sas(f, format="sas7bdat", iterator=True)

# Get the first row from the iterator
row = next(sas_reader)
# Get the first row from the iterator
row = next(sas_reader)

# Get all the values from the row and loop through them
for i, v in enumerate(row.values.tolist()[0]):
Expand All @@ -152,7 +158,9 @@ def get_fields(self) -> List[DataDocVariable]:
short_name=sas_reader.columns[i].name,
# Assume labels are defined in the default language (NORSK_BOKMÅL)
# If this is not correct, the user may fix it via the UI
name={state.current_metadata_language: sas_reader.columns[i].label},
name=LanguageStrings(
**{state.current_metadata_language: sas_reader.columns[i].label}
),
# Access the python type for the value and transform it to a DataDoc Data type
data_type=self.transform_data_type(type(v).__name__.lower()),
)
Expand Down
111 changes: 111 additions & 0 deletions datadoc/backend/StorageAdapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import logging
import os
import pathlib
from io import IOBase, TextIOWrapper
from typing import Protocol
from urllib.parse import urlsplit, urlunsplit

from dapla import AuthClient, FileClient
from gcsfs import GCSFileSystem

GCS_PROTOCOL_PREFIX = "gs://"

logger = logging.getLogger(__name__)


class GCSObject:
def __init__(self, path: str):
self._url = urlsplit(path)

if AuthClient.is_ready():
# Running on Dapla, rely on dapla-toolbelt for auth
self.fs = FileClient.get_gcs_file_system()
else:
# All other environments, rely on Standard Google credential system
# If this doesn't work for you, try running the following commands:
#
# gcloud auth application-default revoke
# gcloud auth application-default login
self.fs = GCSFileSystem()

def _rebuild_url(self, new_path: str) -> str:
return urlunsplit((self._url.scheme, self._url.netloc, new_path, None, None))

def open(self, **kwargs) -> IOBase:
return self.fs.open(self.location, **kwargs)

def parent(self) -> str:
parent = os.path.dirname(self._url.path)
return self._rebuild_url(parent)

def joinpath(self, part):
"""Modify the path in place"""
self._url = urlsplit(self._rebuild_url(os.path.join(self._url.path, part)))

def exists(self) -> bool:
return self.fs.exists(self.location)

def write_text(self, text: str) -> None:
f: TextIOWrapper
with self.fs.open(self.location, mode="w") as f:
f.write(text)

@property
def location(self) -> str:
return urlunsplit(self._url)


class LocalFile:
def __init__(self, path):
self._path_object: pathlib.Path = pathlib.Path(path)

def open(self, **kwargs) -> IOBase:
return open(str(self._path_object), **kwargs)

def parent(self) -> str:
return str(self._path_object.resolve().parent)

def joinpath(self, part):
"""Modify the path in place"""
self._path_object = self._path_object.joinpath(part)

def exists(self) -> bool:
return self._path_object.exists()

def write_text(self, text: str) -> None:
self._path_object.write_text(text, encoding="utf-8")

@property
def location(self) -> str:
return str(self._path_object)


class StorageAdapter(Protocol):
@staticmethod
def for_path(path: str):
"""
Return a concrete class implementing this Protocol based on the structure of the path.
"""
if path.startswith(GCS_PROTOCOL_PREFIX):
return GCSObject(path)
else:
return LocalFile(path)

def open(self, **kwargs) -> IOBase:
...

def parent(self) -> str:
...

def joinpath(self, part: str) -> None:
...

def exists(self) -> bool:
...

def write_text(self, text: str) -> None:
...

@property
def location(self) -> str:
...
Loading

0 comments on commit a38f7b7

Please sign in to comment.