diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b72e69e --- /dev/null +++ b/Makefile @@ -0,0 +1,22 @@ +.PHONY : all flake8 test + + +FLAKE8_FILES := \ + filter_buildings.py \ + find_lifecycle_updates.py \ + find_removed.py \ + shared.py \ + tests/test_filter.py \ + tests/test_find_lifecycle_updates.py \ + tests/test_find_removed.py \ + tests/test_shared.py \ + ; + + +all : flake8 test + +flake8 : $(FLAKE8_FILES) + flake8 $? + +test : + python3 -m unittest discover -s tests diff --git a/filter_buildings.py b/filter_buildings.py index 4ae9b2d..13aca60 100644 --- a/filter_buildings.py +++ b/filter_buildings.py @@ -2,71 +2,51 @@ import json import sys -import requests +import shared -def parse_ref(raw_ref): - return {int(ref) for ref in raw_ref.split(';') if ref} - - -def run_overpass_query(query): - overpass_url = "https://overpass-api.de/api/interpreter" - params = {'data': query} - version = '0.8.0' - headers = {'User-Agent': 'building2osm/' + version} - request = requests.get(overpass_url, - params=params, - headers=headers) - return request.json()['elements'] - - -def load_osm_refs(municipality_id): - query_fmt = '''[out:json][timeout:60]; - (area[ref={}][admin_level=7][place=municipality];)->.county; - nwr["ref:bygningsnr"](area.county); - out tags noids; - ''' - query = query_fmt.format(municipality_id) - elements = run_overpass_query(query) +def load_osm_refs(osm_raw): + elements = json.loads(osm_raw)['elements'] osm_refs = set() for element in elements: raw_ref = element['tags']['ref:bygningsnr'] - osm_refs |= parse_ref(raw_ref) + osm_refs |= shared.parse_ref(raw_ref) return osm_refs +def filter_buildings(cadastral_buildings, osm_refs): + def in_osm(building): + raw_ref = building['properties']['ref:bygningsnr'] + building_refs = shared.parse_ref(raw_ref) + return bool(building_refs & osm_refs) + + return [b for b in cadastral_buildings if not in_osm(b)] + + def main(): parser = argparse.ArgumentParser() parser.add_argument('--input', required=True) parser.add_argument('--output', required=True) - parser.add_argument('--municipality', required=True, type=int) + parser.add_argument('--municipality', required=True) args = parser.parse_args() - with open(args.input, 'r', encoding='utf-8') as file: - data = json.load(file) - import_buildings = data['features'] - print('Loaded {} buildings'.format(len(import_buildings))) + muni_id = shared.handle_municipality_argument(args.municipality) - osm_refs = load_osm_refs(args.municipality) - print('Loaded {} unique references from OSM'.format(len(osm_refs))) + with open(args.input, 'r', encoding='utf-8') as file: + cadastral = shared.parse_cadastral_data(file.read()) + print(f'Loaded {len(cadastral)} buildings') - def in_osm(building): - raw_ref = building['properties']['ref:bygningsnr'] - building_refs = parse_ref(raw_ref) - return bool(building_refs & osm_refs) + osm_raw = shared.load_building_tags(muni_id) + osm_refs = load_osm_refs(osm_raw) + print(f'Loaded {len(osm_refs)} unique references from OSM') - missing_in_osm = [b for b in import_buildings if not in_osm(b)] - print('Writing {} buildings missing from OSM'.format(len(missing_in_osm))) + output = filter_buildings(cadastral, osm_refs) + print(f'Writing {len(output)} buildings missing from OSM') with open(args.output, 'w', encoding='utf-8') as file: - geojson = { - 'type': 'FeatureCollection', - 'generator': 'filter_buildings.py', - 'features': missing_in_osm, - } - json.dump(geojson, file) + file.write(shared.format_geojson(output)) return 0 diff --git a/find_lifecycle_updates.py b/find_lifecycle_updates.py new file mode 100644 index 0000000..9329872 --- /dev/null +++ b/find_lifecycle_updates.py @@ -0,0 +1,99 @@ +import argparse +import json +import re +import sys + +import shared + + +def osm_buildings_by_ref(osm_buildings): + by_ref = {} + for osm_building in osm_buildings: + tags = osm_building['tags'] + raw_ref = tags['ref:bygningsnr'] + for osm_ref in shared.parse_ref(raw_ref): + try: + by_ref[osm_ref].append(osm_building) + except KeyError: + by_ref[osm_ref] = [osm_building] + + return by_ref + + +def cadastral_construction_finished(building): + tags = building['properties'] + if 'STATUS' not in tags: + raise RuntimeError + + if re.match('#(RA|IG) .*', tags['STATUS']): + return False + + return True + + +def osm_construction_finished(building): + tags = building['tags'] + if 'planned:building' in tags: + return False + elif 'building' in tags and tags['building'] == 'construction': + return False + else: + return True + + +def has_lifecycle_update(cadastral_building, osm_buildings): + for osm_building in osm_buildings: + cadastral_done = cadastral_construction_finished(cadastral_building) + osm_done = osm_construction_finished(osm_building) + + if cadastral_done and not osm_done: + return True + + return False + + +def find_lifecycle_updates(cadastral_buildings, osm_by_ref): + updated = [] + for cadastral_building in cadastral_buildings: + cadastral_ref = int(cadastral_building['properties']['ref:bygningsnr']) + try: + osm_buildings = osm_by_ref[cadastral_ref] + except KeyError: + # Building is missing from OSM + continue + + if has_lifecycle_update(cadastral_building, osm_buildings): + updated.append(cadastral_building) + continue + + return updated + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--input', required=True) + parser.add_argument('--output', required=True) + parser.add_argument('--municipality', required=True) + args = parser.parse_args() + + muni_id = shared.handle_municipality_argument(args.municipality) + + with open(args.input, 'r', encoding='utf-8') as file: + cadastral = shared.parse_cadastral_data(file.read()) + print(f'Loaded {len(cadastral)} buildings') + + osm_raw = shared.load_building_tags(muni_id) + osm_buildings = json.loads(osm_raw)['elements'] + osm_by_ref = osm_buildings_by_ref(osm_buildings) + print(f'Loaded {len(osm_buildings)} buildings from OSM') + + output = find_lifecycle_updates(cadastral, osm_by_ref) + print(f'Writing {len(output)} updated buildings') + with open(args.output, 'w', encoding='utf-8') as file: + file.write(shared.format_geojson(output)) + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/find_removed.py b/find_removed.py new file mode 100644 index 0000000..dedbaba --- /dev/null +++ b/find_removed.py @@ -0,0 +1,87 @@ +import argparse +import json + +import shared + + +def collect_refs(buildings): + refs = set() + + for building in buildings: + try: + tags = building['tags'] + except KeyError: + tags = building['properties'] + + raw_ref = tags['ref:bygningsnr'] + for ref in shared.parse_ref(raw_ref): + refs.add(ref) + + return refs + + +def to_output(building): + if building['type'] == 'node': + lon = building['lon'] + lat = building['lat'] + else: + lon = building['center']['lon'] + lat = building['center']['lat'] + + return { + 'type': 'Feature', + 'geometry': { + 'type': 'Point', + 'coordinates': [ + lon, + lat, + ] + }, + 'properties': building['tags'], + } + + +def find_removed(cadastral_buildings, osm_buildings): + cadastral_refs = collect_refs(cadastral_buildings) + osm_refs = collect_refs(osm_buildings) + + removed_buildings = [] + for ref in osm_refs - cadastral_refs: + for osm_building in osm_buildings: + if ref in collect_refs([osm_building]): + try: + removed_buildings.append(to_output(osm_building)) + except Exception: + print(osm_building) + raise + + return removed_buildings + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--input', required=True) + parser.add_argument('--output', required=True) + parser.add_argument('--municipality', required=True) + args = parser.parse_args() + + muni_id = shared.handle_municipality_argument(args.municipality) + + with open(args.input, 'r', encoding='utf-8') as file: + cadastral = shared.parse_cadastral_data(file.read()) + print(f'Loaded {len(cadastral)} buildings') + + osm_raw = shared.load_building_tags(muni_id, + with_position=True) + osm_buildings = json.loads(osm_raw)['elements'] + print(f'Loaded {len(osm_buildings)} buildings from OSM') + + output = find_removed(cadastral, osm_buildings) + print(f'Writing {len(output)} buildings that have been removed') + + with open(args.output, 'w', encoding='utf-8') as file: + file.write(shared.format_geojson(output)) + + +if __name__ == '__main__': + main() diff --git a/shared.py b/shared.py new file mode 100644 index 0000000..473348e --- /dev/null +++ b/shared.py @@ -0,0 +1,110 @@ +import json +import re +import sys + +import requests + + +class NoResults(Exception): + pass + + +class MultipleResults(Exception): + def __init__(self, *results): + self.results = list(results) + + +def parse_ref(raw_ref): + return {int(ref) for ref in raw_ref.split(';') if ref} + + +def run_overpass_query(query): + overpass_url = "https://overpass-api.de/api/interpreter" + params = {'data': query} + version = '0.8.0' + headers = {'User-Agent': 'building2osm/' + version} + request = requests.get(overpass_url, + params=params, + headers=headers) + request.raise_for_status() + return request.text + + +def load_building_tags(municipality_id, with_position=False): + center = 'center' if with_position else '' + query = f'''[out:json][timeout:60]; + (area[ref={municipality_id}] + [admin_level=7] + [place=municipality]; + ) -> .county; + nwr["ref:bygningsnr"](area.county); + out tags noids {center}; + ''' + return run_overpass_query(query) + + +def parse_cadastral_data(data): + return json.loads(data)['features'] + + +def format_geojson(features): + geojson = { + 'type': 'FeatureCollection', + 'generator': 'filter_buildings.py', + 'features': features, + } + return json.dumps(geojson) + + +def load_municipalities(): + url = ('https://ws.geonorge.no/kommuneinfo/v1/fylkerkommuner' + + '?filtrer=fylkesnummer%2Cfylkesnavn%2Ckommuner.kommunenummer' + + '%2Ckommuner.kommunenavnNorsk') + request = requests.get(url) + + municipalities = {} + for county in request.json(): + for municipality in county['kommuner']: + muni_number = municipality['kommunenummer'] + muni_name = municipality['kommunenavnNorsk'] + municipalities[muni_number] = muni_name + + return municipalities + + +def resolve_municipality_id(municipalities, lookup_name): + result = None + for muni_id in municipalities: + muni_name = municipalities[muni_id] + if lookup_name.casefold() in muni_name.casefold(): + current = { + 'id': muni_id, + 'name': muni_name, + } + + if result is not None: + raise MultipleResults(result, current) + else: + result = current + + if result is None: + raise NoResults + + return result['id'] + + +def handle_municipality_argument(municipality): + if re.match('[0-9]{4}', municipality): + return municipality + + municipalities = load_municipalities() + try: + return resolve_municipality_id( + municipalities, municipality) + except NoResults: + sys.exit(f'Municipality {municipality} not found') + except MultipleResults as e: + sys.exit('Found multiple matching municipalities: {}'.format( + ', '.join( + [f'{item["id"]}/{item["name"]}' for item in e.results] + ))) diff --git a/tests/test_filter.py b/tests/test_filter.py new file mode 100644 index 0000000..38a9419 --- /dev/null +++ b/tests/test_filter.py @@ -0,0 +1,50 @@ +import json +import unittest + +import filter_buildings + + +def cadastral(ref): + return { + 'properties': { + 'ref:bygningsnr': str(ref), + }, + } + + +def osm(ref): + return { + 'tags': { + 'ref:bygningsnr': str(ref), + }, + } + + +class TestBuildingFilter(unittest.TestCase): + def _run_filter(self, cadastral_buildings, osm_ref): + return filter_buildings.filter_buildings(cadastral_buildings, + osm_ref) + + def test_remove_if_imported(self): + output = self._run_filter([cadastral(1)], {1}) + self.assertEqual([], output) + + def test_keep_if_not_in_osm(self): + cadastral_buildings = [cadastral(1)] + output = self._run_filter(cadastral_buildings, set()) + self.assertEqual(cadastral_buildings, output) + + +class TestOsmDataParsing(unittest.TestCase): + def _parse(self, osm_buildings): + return filter_buildings.load_osm_refs( + json.dumps({'elements': osm_buildings})) + + def test_parse_empty(self): + self.assertEqual(set(), self._parse([])) + + def test_parse_single_building(self): + self.assertEqual({1}, self._parse([osm(1)])) + + def test_parse_duplicate_id(self): + self.assertEqual({2}, self._parse([osm(2), osm(2)])) diff --git a/tests/test_find_lifecycle_updates.py b/tests/test_find_lifecycle_updates.py new file mode 100644 index 0000000..2939a15 --- /dev/null +++ b/tests/test_find_lifecycle_updates.py @@ -0,0 +1,66 @@ +import unittest + +import find_lifecycle_updates + + +def cadastral(ref, status): + if status == 'MB': + status = '#MB Midlertidig brukstillatelse' + elif status == 'IG': + status = '#IG Igangsettingstillatelse' + else: + raise RuntimeError + + return { + 'properties': { + 'ref:bygningsnr': str(ref), + 'STATUS': status, + }, + } + + +def osm(ref, planned=False, construction=False): + tags = { + 'ref:bygningsnr': str(ref), + } + + if planned: + tags['planned:building'] = 'yes' + elif construction: + tags['building'] = 'construction' + + return {'tags': tags} + + +class TestFindLifecycleUpdate(unittest.TestCase): + def _run_filter(self, cadastral_buildings, osm_buildings): + osm_by_ref = find_lifecycle_updates.osm_buildings_by_ref( + osm_buildings) + return find_lifecycle_updates.find_lifecycle_updates( + cadastral_buildings, + osm_by_ref) + + def test_provisional_use_permit_is_update_from_planned(self): + cadastral_buildings = [cadastral(1, status='MB')] + osm_buildings = [osm(1, planned=True)] + output = self._run_filter(cadastral_buildings, osm_buildings) + self.assertEqual(cadastral_buildings, output) + + def test_provisional_use_permit_is_update_from_construction(self): + cadastral_buildings = [cadastral(1, status='MB')] + osm_buildings = [osm(1, construction=True)] + output = self._run_filter(cadastral_buildings, osm_buildings) + self.assertEqual(cadastral_buildings, output) + + def test_dont_include_construction_permit_when_osm_has_planned(self): + # IG doesn't imply that construction has actually started, so planned + # might still be the correct OSM tagging + cadastral_buildings = [cadastral(1, status='IG')] + osm_buildings = [osm(1, planned=True)] + output = self._run_filter(cadastral_buildings, osm_buildings) + self.assertEqual([], output) + + def test_ignore_building_missing_from_osm(self): + cadastral_buildings = [cadastral(1, status='MB')] + output = self._run_filter(cadastral_buildings, []) + self.assertEqual([], output) diff --git a/tests/test_find_removed.py b/tests/test_find_removed.py new file mode 100644 index 0000000..6b0dea6 --- /dev/null +++ b/tests/test_find_removed.py @@ -0,0 +1,71 @@ +import unittest + +import find_removed + + +expected_output_point = { + 'type': 'Feature', + 'geometry': { + 'type': 'Point', + 'coordinates': [ + 11.0, + 59.0, + ] + }, + 'properties': { + 'ref:bygningsnr': '1', + 'building': 'yes', + } + } + + +def cadastral(ref): + return {'properties': {'ref:bygningsnr': str(ref)}} + + +def osm_node(ref): + return { + 'type': 'node', + 'lat': 59.0, + 'lon': 11.0, + 'tags': { + 'building': 'yes', + 'ref:bygningsnr': str(ref), + } + } + + +def osm_way(ref): + return { + 'type': 'way', + 'center': { + 'lat': 59.0, + 'lon': 11.0, + }, + 'tags': { + 'building': 'yes', + 'ref:bygningsnr': str(ref), + } + } + + +class TestFindRemoved(unittest.TestCase): + def _find_removed(self, cadastral_buildings, osm_buildings): + return find_removed.find_removed(cadastral_buildings, + osm_buildings) + + def test_ignore_building_still_in_cadastral_data(self): + removed = self._find_removed([cadastral(1)], [osm_node(1)]) + self.assertEqual([], removed) + + def test_ignore_building_missing_from_osm(self): + removed = self._find_removed([cadastral(1)], []) + self.assertEqual([], removed) + + def test_output_removed_building_node(self): + removed = self._find_removed([], [osm_node(1)]) + self.assertEqual([expected_output_point], removed) + + def test_output_removed_building_way(self): + removed = self._find_removed([], [osm_way(1)]) + self.assertEqual([expected_output_point], removed) diff --git a/tests/test_shared.py b/tests/test_shared.py new file mode 100644 index 0000000..d51324b --- /dev/null +++ b/tests/test_shared.py @@ -0,0 +1,61 @@ +import unittest + +import shared + + +class TestMuncipalityResolution(unittest.TestCase): + def setUp(self): + self.municipalities = { + '0301': 'Oslo', + # '0231': 'Skedsmo', + '3018': 'Våler', + '3030': 'Lillestrøm', + '3419': 'Våler', + '4215': 'Lillesand', + '4637': 'Hyllestad', + } + + def _resolve(self, muni_name): + return shared.resolve_municipality_id( + self.municipalities, + muni_name) + + def _assert_resolves_to(self, muni_name, muni_id): + self.assertEqual(muni_id, self._resolve(muni_name)) + + def test_resolve_municipality(self): + self._assert_resolves_to('Lillestrøm', '3030') + + def test_resolve_zero_prefix(self): + self._assert_resolves_to('Oslo', '0301') + + def test_resolve_duplicate_name(self): + with self.assertRaises(shared.MultipleResults) as cm: + self._resolve('Våler') + + self.assertEqual(cm.exception.results, [ + {'id': '3018', 'name': 'Våler'}, + {'id': '3419', 'name': 'Våler'}, + ]) + + def test_resolve_missing(self): + with self.assertRaises(shared.NoResults): + self._resolve('Skedsmo') + + def test_resolve_with_different_case(self): + self._assert_resolves_to('lILLESTRØM', '3030') + + def test_resolve_using_prefix(self): + self._assert_resolves_to('Lillest', '3030') + + def test_prefix_resolution_to_multiple_results(self): + with self.assertRaises(shared.MultipleResults) as cm: + self._resolve('Lilles') + + self.assertEqual(cm.exception.results, [ + {'id': '3030', 'name': 'Lillestrøm'}, + {'id': '4215', 'name': 'Lillesand'}, + ]) + + def test_resolve_with_infix_match(self): + self._assert_resolves_to('llestr', '3030')