395 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			395 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import folium
 | 
						|
from jupyter_progressbar import ProgressBar
 | 
						|
from matplotlib import pyplot
 | 
						|
from pygeoif.geometry import mapping
 | 
						|
from shapely.geometry.geo import shape, box
 | 
						|
 | 
						|
from stimmen.cbs import data_file
 | 
						|
from html import escape
 | 
						|
import numpy as np
 | 
						|
 | 
						|
from stimmen.latitude_longitude import reverse_latitude_longitude
 | 
						|
 | 
						|
import tempfile
 | 
						|
import time
 | 
						|
from selenium import webdriver
 | 
						|
from .folium_injections import *
 | 
						|
from .folium_colorbar import *
 | 
						|
 | 
						|
 | 
						|
def get_palette(n, no_black=True, no_white=True):
 | 
						|
    with open(data_file('data', 'glasbey', '{}_colors.txt'.format(n + no_black + no_white))) as f:
 | 
						|
        return [
 | 
						|
            '#%02x%02x%02x' % tuple(int(c) for c in line.replace('\n', '').split(','))
 | 
						|
            for line in f
 | 
						|
            if not no_black or line != '0,0,0\n'
 | 
						|
            if not no_white or line != '255,255,255\n'
 | 
						|
        ]
 | 
						|
 | 
						|
 | 
						|
def colored_name(name, color):
 | 
						|
    return '<span class=\\"with-block\\" style=\\"color:{}; \\"><span class=\\"blackable; \\">{}</span></span>'.format(color, name)
 | 
						|
 | 
						|
 | 
						|
def region_area_cdf(region_shape, resolution=10000):
 | 
						|
    xmin, ymin, xmax, ymax = region_shape.bounds
 | 
						|
    shape_area = region_shape.area
 | 
						|
    spaces = np.linspace(xmin, xmax, resolution + 1)
 | 
						|
    return np.array([
 | 
						|
        box(xmin, ymin, xmax_, ymax).intersection(region_shape).area / shape_area
 | 
						|
        for xmax_ in spaces
 | 
						|
    ])
 | 
						|
 | 
						|
 | 
						|
# Only slightly faster than region_area_cdf.
 | 
						|
# def fast_sliced_shape_areas(region_shape, recursions=13):
 | 
						|
#     results = np.zeros(2 ** recursions)
 | 
						|
#     xmin, ymin, xmax, ymax = region_shape.bounds
 | 
						|
#     total = 0
 | 
						|
#
 | 
						|
#     def f(shape_, xmin, ymin, xmax, ymax, recursions, results_):
 | 
						|
#         nonlocal total
 | 
						|
#         shape_ = box(xmin, ymin, xmax, ymax).intersection(shape_)
 | 
						|
#         if recursions == 0:
 | 
						|
#             assert results_.shape == (1,)
 | 
						|
#             results_[0] = shape_.area
 | 
						|
#             total += shape_.area
 | 
						|
#         else:
 | 
						|
#             xmiddle = xmin + (xmax - xmin) / 2
 | 
						|
#             middle_index = len(results_) // 2
 | 
						|
#             f(shape_, xmin, ymin, xmiddle, ymax, recursions - 1, results_[:middle_index])
 | 
						|
#             f(shape_, xmiddle, ymin, xmax, ymax, recursions - 1, results_[middle_index:])
 | 
						|
#
 | 
						|
#     f(region_shape, xmin, ymin, xmax, ymax, recursions, results)
 | 
						|
#     return results / results.sum() * region_shape.area
 | 
						|
 | 
						|
 | 
						|
def area_adjust_boundaries(region_shape, boundaries, region_cdf_cache=None, resolution=10000):
 | 
						|
    """Adjust the boundaries from percentage of the width of a shape, to percentage of the area of a shape"""
 | 
						|
    if region_cdf_cache is None:
 | 
						|
        region_cdf_cache = region_area_cdf(region_shape, resolution)
 | 
						|
    elif not isinstance(region_cdf_cache, np.ndarray):
 | 
						|
        region_cdf_cache = np.array(region_cdf_cache)
 | 
						|
    return width_adjust_boundaries(
 | 
						|
        region_shape,
 | 
						|
        np.abs(region_cdf_cache[None, :] - boundaries[:, None]).argmin(axis=1) / resolution
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def width_adjust_boundaries(region_shape, boundaries):
 | 
						|
    xmin, _, xmax, _ = region_shape.bounds
 | 
						|
    return boundaries * (xmax - xmin) + xmin
 | 
						|
 | 
						|
 | 
						|
def pronunciation_bars(
 | 
						|
    regions, dataframe,
 | 
						|
    region_name_property, region_name_column,
 | 
						|
    group_column='answer_text',
 | 
						|
    count_column=None,
 | 
						|
    cutoff_percentage=0.05,
 | 
						|
    normalize_area=True,
 | 
						|
    progress_bar=False,
 | 
						|
    area_adjust_resolution=10000,
 | 
						|
    simplify_shapes=None,
 | 
						|
):
 | 
						|
    # all values of group_column that appear at least cutoff_percentage in one of the regions
 | 
						|
    relevant_groups = {
 | 
						|
        group
 | 
						|
        for region_name, region_rows in dataframe.groupby(region_name_column)
 | 
						|
        for group, aggregation in region_rows.groupby(
 | 
						|
            group_column).agg({group_column: len}).iterrows()
 | 
						|
        if aggregation[group_column] >= cutoff_percentage * len(region_rows)
 | 
						|
    }
 | 
						|
 | 
						|
    group_to_color = dict(zip(relevant_groups, get_palette(len(relevant_groups))))
 | 
						|
    group_to_color['other'] = '#ccc'
 | 
						|
 | 
						|
    n_other = len(dataframe) - sum(
 | 
						|
        sum(dataframe[group_column] == group_value)
 | 
						|
        for group_value in relevant_groups
 | 
						|
    )
 | 
						|
 | 
						|
    # Each FeatureGroup represents all polygons (one for each region) of the relevant_groups
 | 
						|
    feature_groups = {
 | 
						|
        group_value: folium.FeatureGroup(
 | 
						|
            name=colored_name(
 | 
						|
                '{value} <span class=\\"amount\\">({amount})</span>'.format(value=escape(group_value), amount=amount),
 | 
						|
                color
 | 
						|
            ),
 | 
						|
            overlay=True
 | 
						|
        )
 | 
						|
        for group_value, color in group_to_color.items()
 | 
						|
        for amount in [
 | 
						|
        sum(dataframe[group_column] == group_value)
 | 
						|
        if group_value != 'other' else
 | 
						|
        n_other
 | 
						|
    ]  # alias
 | 
						|
        if amount > 0
 | 
						|
    }
 | 
						|
 | 
						|
    progress_bar = ProgressBar if progress_bar else lambda x: x
 | 
						|
 | 
						|
    # for each region, create the bar-polygons.
 | 
						|
    for feature in progress_bar(regions['features']):
 | 
						|
        region_name = feature['properties'][region_name_property]
 | 
						|
        region_rows = dataframe[dataframe[region_name_column] == region_name]
 | 
						|
        region_shape = shape(feature['geometry'])
 | 
						|
        if simplify_shapes:
 | 
						|
            region_shape = region_shape.simplify(simplify_shapes)
 | 
						|
        _, ymin, _, ymax = region_shape.bounds
 | 
						|
 | 
						|
        group_values_occurrence = {
 | 
						|
            group_value: aggregation[group_column]
 | 
						|
            for group_value, aggregation in region_rows.groupby(group_column).agg({group_column: len}).iterrows()
 | 
						|
            if group_value in relevant_groups
 | 
						|
        }
 | 
						|
        group_values_occurrence['other'] = len(region_rows) - sum(group_values_occurrence.values())
 | 
						|
        group_values, group_occurrences = zip(*sorted(
 | 
						|
            group_values_occurrence.items(),
 | 
						|
            key=lambda x: (x[0] == 'other', -x[1])
 | 
						|
        ))
 | 
						|
 | 
						|
        group_percentages = np.array(group_occurrences) / max(1, len(region_rows))
 | 
						|
        group_boundaries = np.cumsum((0,) + group_occurrences) / max(1, len(region_rows))
 | 
						|
        if normalize_area:
 | 
						|
            if '__region_shape_cdf_cache' not in feature['properties']:
 | 
						|
                feature['properties']['__region_shape_cdf_cache'] = region_area_cdf(
 | 
						|
                    region_shape, resolution=area_adjust_resolution).tolist()
 | 
						|
            group_boundaries = area_adjust_boundaries(
 | 
						|
                region_shape, group_boundaries,
 | 
						|
                region_cdf_cache=feature['properties']['__region_shape_cdf_cache'],
 | 
						|
                resolution=area_adjust_resolution
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            group_boundaries = width_adjust_boundaries(region_shape, group_boundaries)
 | 
						|
 | 
						|
        for group_value, percentage, count, left_boundary, right_boundary in zip(
 | 
						|
                group_values,
 | 
						|
                group_percentages,
 | 
						|
                group_occurrences,
 | 
						|
                group_boundaries[:-1], group_boundaries[1:]
 | 
						|
        ):
 | 
						|
            if count == 0 or left_boundary == right_boundary:
 | 
						|
                continue
 | 
						|
 | 
						|
            bar_shape = region_shape.intersection(box(left_boundary, ymin, right_boundary, ymax))
 | 
						|
            if bar_shape.area == 0 or group_occurrences == 0:
 | 
						|
                continue
 | 
						|
            polygon = folium.Polygon(
 | 
						|
                reverse_latitude_longitude(mapping(bar_shape)['coordinates']),
 | 
						|
                fill_color=group_to_color[group_value],
 | 
						|
                fill_opacity=0.8,
 | 
						|
                color=None,
 | 
						|
                popup='{} ({}, {: 3d}%)'.format(group_value, count, int(round(100 * percentage)))
 | 
						|
            )
 | 
						|
            polygon._bar_shape = bar_shape
 | 
						|
            polygon.add_to(feature_groups[group_value])
 | 
						|
 | 
						|
    return feature_groups
 | 
						|
 | 
						|
 | 
						|
def shape_label(region_shape, label, font_size=12):
 | 
						|
    return folium.map.Marker(
 | 
						|
        [region_shape.centroid.y, region_shape.centroid.x],
 | 
						|
        icon=folium.DivIcon(
 | 
						|
            icon_size=(50 / 12 * font_size, 24 / 12 * font_size),
 | 
						|
            icon_anchor=(25 / 12 * font_size, font_size),
 | 
						|
            html=(
 | 
						|
                '<div class="percentage-label" style="font-size: {}pt; '
 | 
						|
                'background-color: rgba(255,255,255,0.8); border-radius: {}px; text-align: center;">'
 | 
						|
                '{}</div>').format(font_size, font_size, label),
 | 
						|
        )
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def pronunciation_heatmaps(
 | 
						|
    regions, dataframe,
 | 
						|
    region_name_property, region_name_column,
 | 
						|
    group_column='answer_text',
 | 
						|
    cmap=pyplot.get_cmap('YlOrRd'),
 | 
						|
    label_font_size=12,
 | 
						|
    min_percentage=None, max_percentage=None,
 | 
						|
    show_labels=False
 | 
						|
):
 | 
						|
    def hex_color(percentage):
 | 
						|
        return '#{:02x}{:02x}{:02x}'.format(*(
 | 
						|
            int(255 * c)
 | 
						|
            for c in cmap(percentage)[:3]
 | 
						|
        ))
 | 
						|
 | 
						|
    group_value_order, group_value_occurrence = zip(*sorted(
 | 
						|
        ((group_value, len(rows)) for group_value, rows in dataframe.groupby(group_column)),
 | 
						|
        key=lambda x: -x[1]
 | 
						|
    ))
 | 
						|
 | 
						|
    occurrence_in_region = {
 | 
						|
        region_name: len(region_rows)
 | 
						|
        for region_name, region_rows in dataframe.groupby(region_name_column)
 | 
						|
    }
 | 
						|
 | 
						|
    max_group_value_occurrence_in_region = [
 | 
						|
        max(
 | 
						|
            (region_rows[group_column] == group_value).sum() / occurrence_in_region[region_name]
 | 
						|
            for region_name, region_rows in dataframe.groupby(region_name_column)
 | 
						|
        )
 | 
						|
        for group_value in group_value_order
 | 
						|
        # for _ in [print(group_value)] # hack
 | 
						|
    ]
 | 
						|
 | 
						|
    feature_groups = [
 | 
						|
        folium.FeatureGroup(
 | 
						|
            name='{} ({})'.format(group_value, occurrence),
 | 
						|
            overlay=False
 | 
						|
        )
 | 
						|
        for group_value, occurrence in zip(group_value_order, group_value_occurrence)
 | 
						|
    ]
 | 
						|
    for group in feature_groups:
 | 
						|
        folium.TileLayer(tiles='stamentoner').add_to(group)
 | 
						|
 | 
						|
    for feature in regions['features']:
 | 
						|
        region_name = feature['properties'][region_name_property]
 | 
						|
        region_rows = dataframe[dataframe[region_name_column] == region_name]
 | 
						|
        region_shape = shape(feature['geometry'])
 | 
						|
        region_occurrence = occurrence_in_region.get(region_name, 1);
 | 
						|
 | 
						|
        group_value_occurrence_in_region = [
 | 
						|
            (region_rows[group_column] == group_value).sum()
 | 
						|
            for group_value in group_value_order
 | 
						|
        ]
 | 
						|
 | 
						|
        for group_value, value_occurrence_in_region, value_occurrence, max_group_value_occurrence, feature_group in zip(
 | 
						|
                group_value_order,
 | 
						|
                group_value_occurrence_in_region,
 | 
						|
                group_value_occurrence,
 | 
						|
                max_group_value_occurrence_in_region,
 | 
						|
                feature_groups
 | 
						|
        ):
 | 
						|
            percentage = value_occurrence_in_region / region_occurrence
 | 
						|
            if max_percentage is not None:
 | 
						|
                max_group_value_occurrence = max_percentage / 100
 | 
						|
            min_value = min_percentage / 100 if min_percentage is not None else 0
 | 
						|
            scale_value = percentage - min_value / (max_group_value_occurrence - min_value)
 | 
						|
            polygon = folium.Polygon(
 | 
						|
                reverse_latitude_longitude(feature['geometry']['coordinates']),
 | 
						|
                fill_color=hex_color(scale_value) if value_occurrence_in_region > 0 else '#888',
 | 
						|
                color='#000000',
 | 
						|
                fill_opacity=0.8,
 | 
						|
                popup='{} ({}, {: 3d}%)'.format( # ‰
 | 
						|
                    region_name[:50], value_occurrence_in_region,
 | 
						|
                    int(round(100 * percentage))
 | 
						|
                )
 | 
						|
            )
 | 
						|
            polygon.add_to(feature_group)
 | 
						|
            if show_labels and value_occurrence_in_region > 0:
 | 
						|
                shape_label(
 | 
						|
                    region_shape,
 | 
						|
                    '{:d}%'.format(int(round(100 * percentage))),  # ‰
 | 
						|
                    font_size=label_font_size
 | 
						|
                ).add_to(feature_group)
 | 
						|
 | 
						|
    return dict(zip(group_value_order, feature_groups))
 | 
						|
 | 
						|
 | 
						|
def scatter_pronunciation_map(
 | 
						|
        dataframe,
 | 
						|
        latitude_column, longitude_column,
 | 
						|
        group_column,
 | 
						|
        split_at_groups=6
 | 
						|
):
 | 
						|
    std = (0.0189, 0.0135)
 | 
						|
 | 
						|
    group_values, group_value_occurrences = zip(*sorted(
 | 
						|
        ((group_value, len(group_rows)) for group_value, group_rows in dataframe.groupby(group_column)),
 | 
						|
        key=lambda x: -x[1]
 | 
						|
    ))
 | 
						|
 | 
						|
    maps = (
 | 
						|
        [group_values, group_values[:split_at_groups], group_values[split_at_groups:]]
 | 
						|
        if len(group_values) > split_at_groups else [group_values]
 | 
						|
    )
 | 
						|
    result_names = ['all', 'most_occurring', 'least_occurring']
 | 
						|
 | 
						|
    results = {name: [] for name in result_names}
 | 
						|
 | 
						|
    for map, map_name in zip(maps, result_names):
 | 
						|
        colors = get_palette(len(map))
 | 
						|
        for group_value, group_color in zip(map, colors):
 | 
						|
            group_rows = dataframe[dataframe[group_column] == group_value]
 | 
						|
 | 
						|
            group_name = '<span style=\\"color: {}; \\">{}  ({})</span>'.format(
 | 
						|
                group_color, escape(group_value), len(group_rows))
 | 
						|
 | 
						|
            results[map_name].append(folium.FeatureGroup(name=group_name))
 | 
						|
 | 
						|
            for point in zip(group_rows[latitude_column], group_rows[longitude_column]):
 | 
						|
                point = tuple(p + s * np.random.randn() for p, s in zip(point, std))
 | 
						|
                folium.Circle(
 | 
						|
                    point,
 | 
						|
                    color=None,
 | 
						|
                    fill_color=group_color,
 | 
						|
                    radius=400 * min(1., 100 / len(group_rows)),
 | 
						|
                    fill_opacity=1
 | 
						|
                ).add_to(results[map_name][-1])
 | 
						|
 | 
						|
    return results
 | 
						|
 | 
						|
 | 
						|
def bar_map_css(legend_fontsize='30pt', attribution_fontsize='14pt'):
 | 
						|
    return FoliumCSS("""
 | 
						|
.leaflet-control-container .leaflet-control-layers-base {{
 | 
						|
    display: none;
 | 
						|
}}
 | 
						|
 | 
						|
.leaflet-control-container .leaflet-control-layers-separator {{
 | 
						|
    display: none;
 | 
						|
}}
 | 
						|
 | 
						|
.leaflet-control-container .leaflet-control-layers-overlays {{
 | 
						|
    display: flex
 | 
						|
}}
 | 
						|
 | 
						|
.leaflet-control-container .leaflet-control-layers-overlays label:not(:last-child) {{
 | 
						|
    margin-right: 15px;
 | 
						|
}}
 | 
						|
 | 
						|
.leaflet-control-container .leaflet-control-layers-overlays label span.with-block::before {{
 | 
						|
    content: '■ '; color: inherit;
 | 
						|
}}
 | 
						|
 | 
						|
.leaflet-control-container .leaflet-control-layers-overlays label {{
 | 
						|
    margin-bottom: 0px; font-size: {legend_fontsize};
 | 
						|
}}
 | 
						|
 | 
						|
.leaflet-control-container .leaflet-control-layers-overlays label input {{
 | 
						|
    display: none;
 | 
						|
}}
 | 
						|
 | 
						|
.leaflet-control-attribution a {{
 | 
						|
    display: none;
 | 
						|
}}
 | 
						|
 | 
						|
.leaflet-control-attribution.leaflet-control-attribution.leaflet-control-attribution.leaflet-control-attribution {{
 | 
						|
    background-color: white;
 | 
						|
    font-size: {attribution_fontsize};
 | 
						|
}}
 | 
						|
""".format(legend_fontsize=legend_fontsize, attribution_fontsize=attribution_fontsize))
 | 
						|
 | 
						|
 | 
						|
def save_map(m, filename, resolution=(1600, 1400), headless=True):
 | 
						|
    f = tempfile.NamedTemporaryFile(delete=False, suffix='.html')
 | 
						|
    f.close()
 | 
						|
    m.save(f.name)
 | 
						|
 | 
						|
    options = webdriver.ChromeOptions()
 | 
						|
    options.add_argument('--window-size={1},{0}'.format(*resolution))
 | 
						|
    if headless:
 | 
						|
        options.add_argument('--headless')
 | 
						|
 | 
						|
    browser = webdriver.Chrome(options=options)
 | 
						|
    browser.get("file://" + f.name)
 | 
						|
    time.sleep(1)
 | 
						|
 | 
						|
    browser.save_screenshot(filename)
 | 
						|
    browser.quit()
 | 
						|
    f.delete
 |