Skip to content

Commit

Permalink
Update link webdienst #386
Browse files Browse the repository at this point in the history
* apply black
  • Loading branch information
chrwm committed Dec 1, 2022
1 parent 85e9e3d commit a63e962
Showing 1 changed file with 46 additions and 24 deletions.
70 changes: 46 additions & 24 deletions open_mastr/soap_api/metadata/description.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ def __init__(self, xml=None):
self.xml = fh.read()
else:
# If no XML file is given, the file is read from an URL
zipurl = 'https://www.marktstammdatenregister.de/MaStRHilfe/files/' \
'webdienst/Dienstbeschreibung_1_2_39_Produktion.zip'
zipurl = "https://www.marktstammdatenregister.de/MaStRHilfe/files/webdienst/" \
"Dienstbeschreibung_Produktion_Version" \
"1.2.87" \ # update version here
".zip"

with urlopen(zipurl) as zipresp:
with ZipFile(BytesIO(zipresp.read())) as zfile:
self.xml = zfile.read('xsd/mastrbasetypes.xsd')


self.xml = zfile.read("xsd/mastrbasetypes.xsd")

# Parse XML and extract relevant data
parsed = xmltodict.parse(self.xml, process_namespaces=False)
self.complex_types = parsed['schema']["complexType"]
self.simple_types = parsed['schema']["simpleType"]
self.complex_types = parsed["schema"]["complexType"]
self.simple_types = parsed["schema"]["simpleType"]

# Prepare parsed data for documentational purposes
abstract_types, parameters, responses, types = self._filter_type_descriptions()
Expand Down Expand Up @@ -78,13 +78,17 @@ def _filter_type_descriptions(self):
raise ValueError("Ohh...")
else:
# Filter all functions
if item["@name"].startswith(("Get", "Set", "Erneute", "Verschiebe", "Delete")):
if item["@name"].startswith(
("Get", "Set", "Erneute", "Verschiebe", "Delete")
):
functions.append(item)

# Further split the list of functions into paramters and responses
if item["@name"].endswith("Parameter"):
if "complexContent" in item.keys():
parameters[item["@name"]] = item["complexContent"]["extension"]
parameters[item["@name"]] = item["complexContent"][
"extension"
]
else:
parameters[item["@name"]] = item
elif item["@name"].endswith("Antwort"):
Expand All @@ -111,12 +115,14 @@ def prepare_simple_type(self):

for simple_type in self.simple_types:
if "enumeration" in simple_type["restriction"]:
possible_values = [_["@value"] for _ in simple_type["restriction"]["enumeration"]]
possible_values = [
_["@value"] for _ in simple_type["restriction"]["enumeration"]
]
else:
possible_values = []
simple_types_doc[simple_type["@name"]] = {
"type": simple_type["restriction"]["@base"],
"values": possible_values
"values": possible_values,
}
return simple_types_doc

Expand All @@ -140,49 +146,61 @@ def functions_data_documentation(self):
if "annotation" in fcn["sequence"]["element"]:
fcn_data = [fcn["sequence"]["element"]]
else:
fcn_data = self.types[fcn["sequence"]["element"]["@type"].split(":")[1]]["sequence"]["element"]
fcn_data = self.types[
fcn["sequence"]["element"]["@type"].split(":")[1]
]["sequence"]["element"]
else:
print(type(fcn["sequence"]))
print(fcn["sequence"])
raise ValueError

# Add data for inherited columns from base types
if "@base" in fcn:
if not fcn["@base"] == 'mastr:AntwortBasis':
fcn_data = _collect_columns_of_base_type(self.types, fcn["@base"].split(":")[1], fcn_data)
if not fcn["@base"] == "mastr:AntwortBasis":
fcn_data = _collect_columns_of_base_type(
self.types, fcn["@base"].split(":")[1], fcn_data
)
function_docs[fcn_name] = {}
for column in fcn_data:
# Replace MaStR internal types with more general ones
if column["@type"].startswith("mastr:"):
try:
column_type = self.simple_types_prepared[column["@type"].split(":")[1]]["type"]
column_type = self.simple_types_prepared[
column["@type"].split(":")[1]
]["type"]
except KeyError:
column_type = column["@type"]
else:
column_type = column["@type"]

if "annotation" in column.keys():
description = column["annotation"]["documentation"].get("#text", None)
description = column["annotation"]["documentation"].get(
"#text", None
)
if description:
description = re.sub(" +", " ", description.replace("\n", ""))
description = re.sub(
" +", " ", description.replace("\n", "")
)
function_docs[fcn_name][column["@name"]] = {
"type": column_type,
"description": description,
"example": column["annotation"]["documentation"].get("m-ex", None)
"type": column_type,
"description": description,
"example": column["annotation"]["documentation"].get(
"m-ex", None
),
}
else:
function_docs[fcn_name][column["@name"]] = {
"type": column_type,
# TODO: insert information from simple type here
"description": None,
"example": None
"example": None,
}

# Hack in a descrition for a column that gets created after download while flattening data
function_docs["GetEinheitWind"]["HerstellerId"] = {
"type": "str",
"description": "Id des Herstellers der Einheit",
"example": 923
"example": 923,
}

return function_docs
Expand All @@ -193,7 +211,11 @@ def _collect_columns_of_base_type(base_types, base_type_name, fcn_data):
fcn_data += type_description["extension"]["sequence"]["element"]

if "@base" in type_description["extension"]:
if not type_description["extension"]["@base"] == 'mastr:AntwortBasis':
fcn_data = _collect_columns_of_base_type(base_types, type_description["extension"]["@base"].split(":")[1], fcn_data)
if not type_description["extension"]["@base"] == "mastr:AntwortBasis":
fcn_data = _collect_columns_of_base_type(
base_types,
type_description["extension"]["@base"].split(":")[1],
fcn_data,
)

return fcn_data

0 comments on commit a63e962

Please sign in to comment.