import folium from jupyter_progressbar import ProgressBar from matplotlib import pyplot from pygeoif.geometry import mapping from shapely.geometry.geo import shape, box from stimmen.cbs import data_file from html import escape import numpy as np from stimmen.latitude_longitude import reverse_latitude_longitude import tempfile import time from selenium import webdriver from .folium_injections import * from .folium_colorbar import * def get_palette(n, no_black=True, no_white=True): with open(data_file('data', 'glasbey', '{}_colors.txt'.format(n + no_black + no_white))) as f: return [ '#%02x%02x%02x' % tuple(int(c) for c in line.replace('\n', '').split(',')) for line in f if not no_black or line != '0,0,0\n' if not no_white or line != '255,255,255\n' ] def colored_name(name, color): return '{}'.format(color, name) def region_area_cdf(region_shape, resolution=10000): xmin, ymin, xmax, ymax = region_shape.bounds shape_area = region_shape.area spaces = np.linspace(xmin, xmax, resolution + 1) return np.array([ box(xmin, ymin, xmax_, ymax).intersection(region_shape).area / shape_area for xmax_ in spaces ]) # Only slightly faster than region_area_cdf. # def fast_sliced_shape_areas(region_shape, recursions=13): # results = np.zeros(2 ** recursions) # xmin, ymin, xmax, ymax = region_shape.bounds # total = 0 # # def f(shape_, xmin, ymin, xmax, ymax, recursions, results_): # nonlocal total # shape_ = box(xmin, ymin, xmax, ymax).intersection(shape_) # if recursions == 0: # assert results_.shape == (1,) # results_[0] = shape_.area # total += shape_.area # else: # xmiddle = xmin + (xmax - xmin) / 2 # middle_index = len(results_) // 2 # f(shape_, xmin, ymin, xmiddle, ymax, recursions - 1, results_[:middle_index]) # f(shape_, xmiddle, ymin, xmax, ymax, recursions - 1, results_[middle_index:]) # # f(region_shape, xmin, ymin, xmax, ymax, recursions, results) # return results / results.sum() * region_shape.area def area_adjust_boundaries(region_shape, boundaries, region_cdf_cache=None, resolution=10000): """Adjust the boundaries from percentage of the width of a shape, to percentage of the area of a shape""" if region_cdf_cache is None: region_cdf_cache = region_area_cdf(region_shape, resolution) elif not isinstance(region_cdf_cache, np.ndarray): region_cdf_cache = np.array(region_cdf_cache) return width_adjust_boundaries( region_shape, np.abs(region_cdf_cache[None, :] - boundaries[:, None]).argmin(axis=1) / resolution ) def width_adjust_boundaries(region_shape, boundaries): xmin, _, xmax, _ = region_shape.bounds return boundaries * (xmax - xmin) + xmin def pronunciation_bars( regions, dataframe, region_name_property, region_name_column, group_column='answer_text', count_column=None, cutoff_percentage=0.05, normalize_area=True, progress_bar=False, area_adjust_resolution=10000, simplify_shapes=None, ): # all values of group_column that appear at least cutoff_percentage in one of the regions relevant_groups = { group for region_name, region_rows in dataframe.groupby(region_name_column) for group, aggregation in region_rows.groupby( group_column).agg({group_column: len}).iterrows() if aggregation[group_column] >= cutoff_percentage * len(region_rows) } group_to_color = dict(zip(relevant_groups, get_palette(len(relevant_groups)))) group_to_color['other'] = '#ccc' n_other = len(dataframe) - sum( sum(dataframe[group_column] == group_value) for group_value in relevant_groups ) # Each FeatureGroup represents all polygons (one for each region) of the relevant_groups feature_groups = { group_value: folium.FeatureGroup( name=colored_name( '{value} ({amount})'.format(value=escape(group_value), amount=amount), color ), overlay=True ) for group_value, color in group_to_color.items() for amount in [ sum(dataframe[group_column] == group_value) if group_value != 'other' else n_other ] # alias if amount > 0 } progress_bar = ProgressBar if progress_bar else lambda x: x # for each region, create the bar-polygons. for feature in progress_bar(regions['features']): region_name = feature['properties'][region_name_property] region_rows = dataframe[dataframe[region_name_column] == region_name] region_shape = shape(feature['geometry']) if simplify_shapes: region_shape = region_shape.simplify(simplify_shapes) _, ymin, _, ymax = region_shape.bounds group_values_occurrence = { group_value: aggregation[group_column] for group_value, aggregation in region_rows.groupby(group_column).agg({group_column: len}).iterrows() if group_value in relevant_groups } group_values_occurrence['other'] = len(region_rows) - sum(group_values_occurrence.values()) group_values, group_occurrences = zip(*sorted( group_values_occurrence.items(), key=lambda x: (x[0] == 'other', -x[1]) )) group_percentages = np.array(group_occurrences) / max(1, len(region_rows)) group_boundaries = np.cumsum((0,) + group_occurrences) / max(1, len(region_rows)) if normalize_area: if '__region_shape_cdf_cache' not in feature['properties']: feature['properties']['__region_shape_cdf_cache'] = region_area_cdf( region_shape, resolution=area_adjust_resolution).tolist() group_boundaries = area_adjust_boundaries( region_shape, group_boundaries, region_cdf_cache=feature['properties']['__region_shape_cdf_cache'], resolution=area_adjust_resolution ) else: group_boundaries = width_adjust_boundaries(region_shape, group_boundaries) for group_value, percentage, count, left_boundary, right_boundary in zip( group_values, group_percentages, group_occurrences, group_boundaries[:-1], group_boundaries[1:] ): if count == 0 or left_boundary == right_boundary: continue bar_shape = region_shape.intersection(box(left_boundary, ymin, right_boundary, ymax)) if bar_shape.area == 0 or group_occurrences == 0: continue polygon = folium.Polygon( reverse_latitude_longitude(mapping(bar_shape)['coordinates']), fill_color=group_to_color[group_value], fill_opacity=0.8, color=None, popup='{} ({}, {: 3d}%)'.format(group_value, count, int(round(100 * percentage))) ) polygon._bar_shape = bar_shape polygon.add_to(feature_groups[group_value]) return feature_groups def shape_label(region_shape, label, font_size=12): return folium.map.Marker( [region_shape.centroid.y, region_shape.centroid.x], icon=folium.DivIcon( icon_size=(50 / 12 * font_size, 24 / 12 * font_size), icon_anchor=(25 / 12 * font_size, font_size), html=( '
' '{}
').format(font_size, font_size, label), ) ) def pronunciation_heatmaps( regions, dataframe, region_name_property, region_name_column, group_column='answer_text', cmap=pyplot.get_cmap('YlOrRd'), label_font_size=12, min_percentage=None, max_percentage=None, show_labels=False ): def hex_color(percentage): return '#{:02x}{:02x}{:02x}'.format(*( int(255 * c) for c in cmap(percentage)[:3] )) group_value_order, group_value_occurrence = zip(*sorted( ((group_value, len(rows)) for group_value, rows in dataframe.groupby(group_column)), key=lambda x: -x[1] )) occurrence_in_region = { region_name: len(region_rows) for region_name, region_rows in dataframe.groupby(region_name_column) } max_group_value_occurrence_in_region = [ max( (region_rows[group_column] == group_value).sum() / occurrence_in_region[region_name] for region_name, region_rows in dataframe.groupby(region_name_column) ) for group_value in group_value_order # for _ in [print(group_value)] # hack ] feature_groups = [ folium.FeatureGroup( name='{} ({})'.format(group_value, occurrence), overlay=False ) for group_value, occurrence in zip(group_value_order, group_value_occurrence) ] for group in feature_groups: folium.TileLayer(tiles='stamentoner').add_to(group) for feature in regions['features']: region_name = feature['properties'][region_name_property] region_rows = dataframe[dataframe[region_name_column] == region_name] region_shape = shape(feature['geometry']) region_occurrence = occurrence_in_region.get(region_name, 1); group_value_occurrence_in_region = [ (region_rows[group_column] == group_value).sum() for group_value in group_value_order ] for group_value, value_occurrence_in_region, value_occurrence, max_group_value_occurrence, feature_group in zip( group_value_order, group_value_occurrence_in_region, group_value_occurrence, max_group_value_occurrence_in_region, feature_groups ): percentage = value_occurrence_in_region / region_occurrence if max_percentage is not None: max_group_value_occurrence = max_percentage / 100 min_value = min_percentage / 100 if min_percentage is not None else 0 scale_value = percentage - min_value / (max_group_value_occurrence - min_value) polygon = folium.Polygon( reverse_latitude_longitude(feature['geometry']['coordinates']), fill_color=hex_color(scale_value) if value_occurrence_in_region > 0 else '#888', color='#000000', fill_opacity=0.8, popup='{} ({}, {: 3d}%)'.format( # ‰ region_name[:50], value_occurrence_in_region, int(round(100 * percentage)) ) ) polygon.add_to(feature_group) if show_labels and value_occurrence_in_region > 0: shape_label( region_shape, '{:d}%'.format(int(round(100 * percentage))), # ‰ font_size=label_font_size ).add_to(feature_group) return dict(zip(group_value_order, feature_groups)) def scatter_pronunciation_map( dataframe, latitude_column, longitude_column, group_column, split_at_groups=6 ): std = (0.0189, 0.0135) group_values, group_value_occurrences = zip(*sorted( ((group_value, len(group_rows)) for group_value, group_rows in dataframe.groupby(group_column)), key=lambda x: -x[1] )) maps = ( [group_values, group_values[:split_at_groups], group_values[split_at_groups:]] if len(group_values) > split_at_groups else [group_values] ) result_names = ['all', 'most_occurring', 'least_occurring'] results = {name: [] for name in result_names} for map, map_name in zip(maps, result_names): colors = get_palette(len(map)) for group_value, group_color in zip(map, colors): group_rows = dataframe[dataframe[group_column] == group_value] group_name = '{} ({})'.format( group_color, escape(group_value), len(group_rows)) results[map_name].append(folium.FeatureGroup(name=group_name)) for point in zip(group_rows[latitude_column], group_rows[longitude_column]): point = tuple(p + s * np.random.randn() for p, s in zip(point, std)) folium.Circle( point, color=None, fill_color=group_color, radius=400 * min(1., 100 / len(group_rows)), fill_opacity=1 ).add_to(results[map_name][-1]) return results def bar_map_css(legend_fontsize='30pt', attribution_fontsize='14pt'): return FoliumCSS(""" .leaflet-control-container .leaflet-control-layers-base {{ display: none; }} .leaflet-control-container .leaflet-control-layers-separator {{ display: none; }} .leaflet-control-container .leaflet-control-layers-overlays {{ display: flex }} .leaflet-control-container .leaflet-control-layers-overlays label:not(:last-child) {{ margin-right: 15px; }} .leaflet-control-container .leaflet-control-layers-overlays label span.with-block::before {{ content: '■ '; color: inherit; }} .leaflet-control-container .leaflet-control-layers-overlays label {{ margin-bottom: 0px; font-size: {legend_fontsize}; }} .leaflet-control-container .leaflet-control-layers-overlays label input {{ display: none; }} .leaflet-control-attribution a {{ display: none; }} .leaflet-control-attribution.leaflet-control-attribution.leaflet-control-attribution.leaflet-control-attribution {{ background-color: white; font-size: {attribution_fontsize}; }} """.format(legend_fontsize=legend_fontsize, attribution_fontsize=attribution_fontsize)) def save_map(m, filename, resolution=(1600, 1400), headless=True): f = tempfile.NamedTemporaryFile(delete=False, suffix='.html') f.close() m.save(f.name) options = webdriver.ChromeOptions() options.add_argument('--window-size={1},{0}'.format(*resolution)) if headless: options.add_argument('--headless') browser = webdriver.Chrome(options=options) browser.get("file://" + f.name) time.sleep(1) browser.save_screenshot(filename) browser.quit() f.delete