stimmenfryslan/stimmen/cbs.py

103 lines
3.4 KiB
Python

from threading import Thread
import time
from pygeoif.geometry import mapping
from shapely.geometry import shape, box
from stimmen.shapefile import shapefiles_to_geojson
import os.path
def data_file(*args):
return os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'..',
*args
)
__cache = {}
__last_access = None
def clear_cache_poll():
global __cache, __last_access
while True:
time.sleep(60*60)
if (__last_access is not None and time.time() - __last_access) > 60*60:
__cache = {}
Thread(target=clear_cache_poll, daemon=True).start()
def get_available_provinces():
file = data_file('data', 'geo', 'cbs', "2018_Imergis_provinciegrenzen_met_water")
provinces = shapefiles_to_geojson(file)['features']
return {province['properties']['provincien'] for province in provinces}
def province_geojson(province, with_water=False):
global __cache, __last_access
file = data_file(
'data', 'geo', 'cbs',
"2018_Imergis_provinciegrenzen_met_water"
if with_water else
"2018-Imergis_provinciegrenzen_kustlijn"
)
if (province, with_water) not in __cache:
provinces = shapefiles_to_geojson(file)['features']
available_provinces = get_available_provinces()
assert province in available_provinces, "Province {} not found, options: {}".format(
province, ', '.join(available_provinces))
__cache[(province, with_water)] = next(
province_ for province_ in provinces if province_['properties']['provincien'] == province)
__last_access = time.time()
return __cache[(province, with_water)]
def expand_box(x0, y0, x1, y1, f=0.1):
return (x0 - (x1 - x0)*f, y0 - (y1 - y0)*f,
x1 + (x1 - x0)*f, y1 + (y1 - y0)*f)
def gwb_in_province(
province='Friesland', region_level='wijk', region_year='2018',
polygon_simplification=0.001, province_dilation=0.0005,
bounding_box_dilation=0.01
):
assert region_level in {'gem', 'wijk', 'buurt'}, (
"region_level {} not supported, must be gem, wijk or buurt".format(region_level))
assert region_year in {2017, 2018}, (
"region_year {} not supported, must 2017 or 2018".format(region_year))
province_with_water = shape(province_geojson(province, with_water=True)['geometry'])
province_bounding_box = box(*expand_box(*province_with_water.bounds, f=bounding_box_dilation))
province_land_only = shape(province_geojson(province, with_water=False)['geometry'])
province_land_only_dilated = province_land_only.buffer(-province_dilation)
geojson = shapefiles_to_geojson(
data_file('data', 'geo', 'cbs', '{}_{}'.format(region_level, region_year)))
shapes = [shape(geojson_['geometry']) for geojson_ in geojson['features']]
shapes, geojson = map(list, zip(*(
(
(intersection.simplify(tolerance=polygon_simplification), geojson_)
if polygon_simplification is not None else (intersection, geojson_)
)
for shape_, geojson_ in zip(shapes, geojson['features'])
if province_bounding_box.contains(shape_)
for intersection in [shape_.intersection(province_land_only_dilated)] # alias
if intersection.area > 0
)))
for geojson_, shape_ in zip(geojson, shapes):
geojson_['geometry'] = mapping(shape_)
return {"type": "FeatureCollection", "features": geojson}