import numpy as np
import matplotlib
from matplotlib import colors
from matplotlib import cm
import matplotlib.pyplot as plt
import folium
from folium.plugins import HeatMap
from loci.analytics import bbox, kwds_freq, filter_by_kwd
from wordcloud import WordCloud
from pandas import DataFrame, concat
from geopandas import GeoDataFrame
import osmnx
[docs]def map_points(pois, sample_size=-1, kwd=None, show_bbox=False, tiles='OpenStreetMap', width='100%', height='100%'):
"""Returns a Folium Map displaying the provided points. Map center and zoom level are set automatically.
Args:
pois (GeoDataFrame): A GeoDataFrame containing the POIs to be displayed.
sample_size (int): Sample size (default: -1; show all).
kwd (string): A keyword to filter by (optional).
show_bbox (bool): Whether to show the bounding box of the GeoDataFrame (default: False).
tiles (string): The tiles to use for the map (default: `OpenStreetMap`).
width (integer or percentage): Width of the map in pixels or percentage (default: 100%).
height (integer or percentage): Height of the map in pixels or percentage (default: 100%).
Returns:
A Folium Map object displaying the given POIs.
"""
# Set the crs to WGS84
# if pois.crs['init'] != '4326':
# pois = pois.to_crs({'init': 'epsg:4326'})
# Filter by keyword
if kwd is None:
pois_filtered = pois
else:
pois_filtered = filter_by_kwd(pois, kwd)
# Pick a sample
if sample_size > 0 and sample_size < len(pois_filtered.index):
pois_filtered = pois_filtered.sample(sample_size)
# Automatically center the map at the center of the bounding box enclosing the POIs.
bb = bbox(pois_filtered)
map_center = [bb.centroid.y, bb.centroid.x]
# Initialize the map
m = folium.Map(location=map_center, tiles=tiles, width=width, height=height)
# Automatically set the zoom level
m.fit_bounds(([bb.bounds[1], bb.bounds[0]], [bb.bounds[3], bb.bounds[2]]))
# Create the marker cluster
locations = list(zip(pois_filtered.geometry.y.tolist(),
pois_filtered.geometry.x.tolist(),
pois_filtered.id.tolist(),
pois_filtered.name.tolist(),
pois_filtered.kwds.tolist()))
callback = """\
function (row) {
var icon, marker;
icon = L.AwesomeMarkers.icon({
icon: 'map-marker', markerColor: 'blue'});
marker = L.marker(new L.LatLng(row[0], row[1]));
marker.setIcon(icon);
var popup = L.popup({height: '300'});
popup.setContent(row[2] + '<br/>' + row[3] + '<br/>' + row[4]);
marker.bindPopup(popup);
return marker;
};
"""
m.add_child(folium.plugins.FastMarkerCluster(locations, callback=callback))
# Add pois to a marker cluster
#coords, popups = [], []
#for idx, poi in pois.iterrows():
# coords.append([poi.geometry.y, poi.geometry.x)]
# label = str(poi['id']) + '<br>' + str(poi['name']) + '<br>' + ' '.join(poi['kwds'])
# popups.append(folium.IFrame(label, width=300, height=100))
#poi_layer = folium.FeatureGroup(name='pois')
#poi_layer.add_child(MarkerCluster(locations=coords, popups=popups))
#m.add_child(poi_layer)
# folium.GeoJson(pois, tooltip=folium.features.GeoJsonTooltip(fields=['id', 'name', 'kwds'],
# aliases=['ID:', 'Name:', 'Keywords:'])).add_to(m)
if show_bbox:
folium.GeoJson(bb).add_to(m)
folium.LatLngPopup().add_to(m)
return m
[docs]def map_geometries(gdf, tiles='OpenStreetMap', width='100%', height='100%'):
"""Returns a Folium Map displaying the provided geometries. Map center and zoom level are set automatically.
Args:
gdf (GeoDataFrame): A GeoDataFrame containing the geometries to be displayed.
tiles (string): The tiles to use for the map (default: `OpenStreetMap`).
width (integer or percentage): Width of the map in pixels or percentage (default: 100%).
height (integer or percentage): Height of the map in pixels or percentage (default: 100%).
Returns:
A Folium Map object displaying the given geometries.
"""
# Set the crs to WGS84
if gdf.crs['init'] != '4326':
gdf = gdf.to_crs({'init': 'epsg:4326'})
# Automatically center the map at the center of the bounding box enclosing the POIs.
bb = bbox(gdf)
map_center = [bb.centroid.y, bb.centroid.x]
# Initialize the map
m = folium.Map(location=map_center, tiles=tiles, width=width, height=height)
# Automatically set the zoom level
m.fit_bounds(([bb.bounds[1], bb.bounds[0]], [bb.bounds[3], bb.bounds[2]]))
# Construct tooltip
fields = list(gdf.columns.values)
fields.remove('geometry')
if 'style' in fields:
fields.remove('style')
tooltip = folium.features.GeoJsonTooltip(fields=fields)
# Add to map
folium.GeoJson(gdf, tooltip=tooltip).add_to(m)
return m
[docs]def map_geometry(geom, tiles='OpenStreetMap', width='100%', height='100%'):
"""Returns a Folium Map displaying the provided geometry. Map center and zoom level are set automatically.
Args:
geom (Shapely Geometry): A geometry to be displayed.
tiles (string): The tiles to use for the map (default: `OpenStreetMap`).
width (integer or percentage): Width of the map in pixels or percentage (default: 100%).
height (integer or percentage): Height of the map in pixels or percentage (default: 100%).
Returns:
A Folium Map object displaying the given geometry.
"""
m = folium.Map(location=[geom.centroid.y, geom.centroid.x], tiles=tiles, width=width, height=height)
m.fit_bounds(([geom.bounds[1], geom.bounds[0]], [geom.bounds[3], geom.bounds[2]]))
folium.GeoJson(geom).add_to(m)
return m
[docs]def barchart(data, orientation='Vertical', x_axis_label='', y_axis_label='', plot_title='', bar_width=0.5,
plot_width=15, plot_height=5, top_k=10):
"""Plots a bar chart with the given data.
Args:
data (dict): The data to plot.
orientation (string): The orientation of the bars in the plot (`Vertical` or `Horizontal`; default: `Vertical`).
x_axis_label (string): Label of x axis.
y_axis_label (string): Label of y axis.
plot_title (string): Title of the plot.
bar_width (scalar): The width of the bars (default: 0.5).
plot_width (scalar): The width of the plot (default: 15).
plot_height (scalar): The height of the plot (default: 5).
top_k (integer): Top k results (if -1, show all; default: 10).
Returns:
A Matplotlib plot displaying the bar chart.
"""
plt.rcdefaults()
matplotlib.rcParams['figure.figsize'] = [plot_width, plot_height]
sort_order = True
if orientation != 'Vertical':
sort_order = False
sorted_by_value = sorted(data.items(), key=lambda kv: kv[1], reverse=sort_order)
if top_k != -1:
if orientation != 'Vertical':
sorted_by_value = sorted_by_value[-top_k:]
else:
sorted_by_value = sorted_by_value[0:top_k]
(objects, performance) = map(list, zip(*sorted_by_value))
y_pos = np.arange(len(objects))
if orientation == 'Vertical':
plt.bar(y_pos, performance, width=bar_width, align='center', alpha=0.5)
plt.xticks(y_pos, objects)
else:
plt.barh(y=y_pos, width=performance, align='center', alpha=0.5)
plt.yticks(y_pos, objects)
plt.xlabel(x_axis_label)
plt.ylabel(y_axis_label)
plt.title(plot_title)
return plt
[docs]def plot_wordcloud(pois, bg_color='black', width=400, height=200):
"""Generates and plots a word cloud from the keywords of the given POIs.
Args:
pois (GeoDataFrame): The POIs from which the keywords will be used to generate the word cloud.
bg_color (string): The background color to use for the plot (default: black).
width (int): The width of the plot.
height (int): The height of the plot.
"""
# Compute keyword frequences
kf = kwds_freq(pois)
# Generate the word cloud
wordcloud = WordCloud(background_color=bg_color, width=width, height=height).generate_from_frequencies(kf)
# Show plot
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()
[docs]def heatmap(pois, sample_size=-1, kwd=None, tiles='OpenStreetMap', width='100%', height='100%', radius=10):
"""Generates a heatmap of the input POIs.
Args:
pois (GeoDataFrame): A POIs GeoDataFrame.
sample_size (int): Sample size (default: -1; show all).
kwd (string): A keyword to filter by (optional).
tiles (string): The tiles to use for the map (default: `OpenStreetMap`).
width (integer or percentage): Width of the map in pixels or percentage (default: 100%).
height (integer or percentage): Height of the map in pixels or percentage (default: 100%).
radius (float): Radius of each point of the heatmap (default: 10).
Returns:
A Folium Map object displaying the heatmap generated from the POIs.
"""
# Set the crs to WGS84
#if pois.crs['init'] != '4326':
# pois = pois.to_crs({'init': 'epsg:4326'})
# Filter by keyword
if kwd is None:
pois_filtered = pois
else:
pois_filtered = filter_by_kwd(pois, kwd)
# Pick a sample
if sample_size > 0 and sample_size < len(pois_filtered.index):
pois_filtered = pois_filtered.sample(sample_size)
# Automatically center the map at the center of the gdf's bounding box
bb = bbox(pois_filtered)
map_center = [bb.centroid.y, bb.centroid.x]
heat_map = folium.Map(location=map_center, tiles=tiles, width=width, height=height)
# Automatically set zoom level
heat_map.fit_bounds(([bb.bounds[1], bb.bounds[0]], [bb.bounds[3], bb.bounds[2]]))
heat_data = [[row['geometry'].y, row['geometry'].x] for index, row in pois_filtered.iterrows()]
# Plot it on the map
HeatMap(heat_data, radius=radius).add_to(heat_map)
return heat_map
[docs]def map_choropleth(areas, id_field, value_field, fill_color='YlOrRd', fill_opacity=0.6, num_bins=5,
tiles='OpenStreetMap', width='100%', height='100%'):
"""Returns a Folium Map showing the clusters. Map center and zoom level are set automatically.
Args:
areas (GeoDataFrame): A GeoDataFrame containing the areas to be displayed.
id_field (string): The name of the column to use as id.
value_field (string): The name of the column indicating the area's value.
fill_color (string): A string indicating a Matplotlib colormap (default: YlOrRd).
fill_opacity (float): Opacity level (default: 0.6).
num_bins (int): The number of bins for the threshold scale (default: 5).
tiles (string): The tiles to use for the map (default: `OpenStreetMap`).
width (integer or percentage): Width of the map in pixels or percentage (default: 100%).
height (integer or percentage): Height of the map in pixels or percentage (default: 100%).
Returns:
A Folium Map object displaying the given clusters.
"""
# Set the crs to WGS84
# if areas.crs['init'] != '4326':
# areas = areas.to_crs({'init': 'epsg:4326'})
# Automatically center the map at the center of the bounding box enclosing the POIs.
bb = bbox(areas)
map_center = [bb.centroid.y, bb.centroid.x]
# Initialize the map
m = folium.Map(location=map_center, tiles=tiles, width=width, height=height)
# Automatically set the zoom level
m.fit_bounds(([bb.bounds[1], bb.bounds[0]], [bb.bounds[3], bb.bounds[2]]))
# threshold_scale = Natural_Breaks(areas[value_field], k=num_bins).bins.tolist()
# threshold_scale.insert(0, areas[value_field].min())
# choropleth = folium.Choropleth(areas, data=areas, columns=[id_field, value_field],
# key_on='feature.properties.{}'.format(id_field),
# fill_color=fill_color, fill_opacity=fill_opacity,
# threshold_scale=threshold_scale).add_to(m)
choropleth = folium.Choropleth(areas, data=areas, columns=[id_field, value_field],
key_on='feature.properties.{}'.format(id_field),
fill_color=fill_color, fill_opacity=fill_opacity).add_to(m)
# Construct tooltip
fields = list(areas.columns.values)
fields.remove('geometry')
if 'style' in fields:
fields.remove('style')
tooltip = folium.features.GeoJsonTooltip(fields=fields)
choropleth.geojson.add_child(tooltip)
return m
[docs]def map_clusters_with_topics(clusters_topics, viz_type='dominant', col_id='cluster_id', col_dominant='Dominant Topic',
colormap='tab10', red='Topic0', green='Topic1', blue='Topic2', single_topic='Topic0',
tiles='OpenStreetMap', width='100%', height='100%'):
"""Returns a Folium Map showing the clusters colored based on their topics.
Args:
clusters_topics (GeoDataFrame): A GeoDataFrame containing the clusters to be displayed and their topics.
viz_type (string): Indicates how to assign colors based on topics. One of: 'dominant', 'single', 'rgb'.
col_id (string): The name of the column indicating the cluster id (default: cluster_id).
col_dominant (string): The name of the column indicating the dominant topic (default: Dominant Topic).
colormap (string): A string indicating a Matplotlib colormap (default: tab10).
red (string): The name of the column indicating the topic to assign to red (default: Topic0).
green (string): The name of the column indicating the topic to assign to green (default: Topic1).
blue (string): The name of the column indicating the topic to assign to blue (default: Topic2).
single_topic (string): The name of the column indicating the topic to use (default: Topic0).
tiles (string): The tiles to use for the map (default: `OpenStreetMap`).
width (integer or percentage): Width of the map in pixels or percentage (default: 100%).
height (integer or percentage): Height of the map in pixels or percentage (default: 100%).
Returns:
A Folium Map object displaying the given clusters colored by their topics.
"""
def style_gen_dominant(row):
cmap = cm.get_cmap(colormap)
rgb = cmap(row[col_dominant])
color = colors.rgb2hex(rgb)
return {'fillColor': color, 'weight': 2, 'color': 'black', 'fillOpacity': 0.8}
def style_gen_mixed(row):
r = round(row[red] * 255) if red is not None else 0
g = round(row[green] * 255) if green is not None else 0
b = round(row[blue] * 255) if blue is not None else 0
color = '#{:02x}{:02x}{:02x}'.format(r, g, b)
return {'fillColor': color, 'weight': 2, 'color': 'black', 'fillOpacity': 0.8}
if viz_type == 'single':
m = map_choropleth(areas=clusters_topics, id_field=col_id, value_field=single_topic, tiles=tiles, width=width,
height=height)
elif viz_type == 'rgb':
clusters_topics['style'] = clusters_topics.apply(lambda row: style_gen_mixed(row), axis=1)
m = map_geometries(clusters_topics, tiles=tiles, width=width, height=height)
else:
clusters_topics['style'] = clusters_topics.apply(lambda row: style_gen_dominant(row), axis=1)
m = map_geometries(clusters_topics, tiles=tiles, width=width, height=height)
return m
[docs]def map_cluster_diff(clusters_a, clusters_b, intersection_color='#00ff00', diff_ab_color='#0000ff',
diff_ba_color='#ff0000', tiles='OpenStreetMap', width='100%', height='100%'):
"""Returns a Folium Map displaying the differences between two sets of clusters. Map center and zoom level
are set automatically.
Args:
clusters_a (GeoDataFrame): The first set of clusters.
clusters_b (GeoDataFrame): The second set of clusters.
intersection_color (color code): The color to use for A & B.
diff_ab_color (color code): The color to use for A - B.
diff_ba_color (color code): The color to use for B - A.
tiles (string): The tiles to use for the map (default: `OpenStreetMap`).
width (integer or percentage): Width of the map in pixels or percentage (default: 100%).
height (integer or percentage): Height of the map in pixels or percentage (default: 100%).
Returns:
A Folium Map object displaying cluster intersections and differences.
"""
if clusters_a.crs['init'] != '4326':
clusters_a = clusters_a.to_crs({'init': 'epsg:4326'})
if clusters_b.crs['init'] != '4326':
clusters_b = clusters_b.to_crs({'init': 'epsg:4326'})
spatial_index_b = clusters_b.sindex
prev_size_list = []
prev_cid_list = []
after_cid_list = []
after_size_list = []
intersect_area_percentage_list = []
intersection_polygons = []
diff_ab_polygs = []
diff_ba_polygs = []
new_polygs = []
intersection_polygons_attr = []
diff_ab_polygs_attr = []
diff_ba_polygs_attr = []
new_polygs_attr = []
for index_1, row_1 in clusters_a.iterrows():
size = row_1['size']
poly = row_1['geometry']
cid = row_1['cluster_id']
prev_area = poly.area
prev_cid_list.append(cid)
prev_size_list.append(size)
possible_matches_index = list(spatial_index_b.intersection(poly.bounds))
possible_matches = clusters_b.iloc[possible_matches_index]
max_area = 0.0
max_cid_intersect = -1
max_size = 0
max_polyg = None
max_intersect_polyg = None
for index_2, row_2 in possible_matches.iterrows():
size_2 = row_2['size']
poly_2 = row_2['geometry']
cid_2 = row_2['cluster_id']
intersect_polyg = poly.intersection(poly_2)
area = intersect_polyg.area
if area > max_area:
max_area = area
max_cid_intersect = cid_2
max_size = size_2
max_polyg = poly_2
max_intersect_polyg = intersect_polyg
if max_cid_intersect == -1:
after_cid_list.append(np.nan)
after_size_list.append(np.nan)
intersect_area_percentage_list.append(0.0)
new_polygs.append(poly)
new_polygs_attr.append('A (' + str(cid) + ')')
else:
after_cid_list.append(max_cid_intersect)
after_size_list.append(max_size)
intersect_area_percentage_list.append(max_area / prev_area)
ab_diff_poly = poly.difference(max_polyg)
ba_diff_poly = max_polyg.difference(poly)
intersection_polygons.append(max_intersect_polyg)
diff_ab_polygs.append(ab_diff_poly)
diff_ba_polygs.append(ba_diff_poly)
intersection_polygons_attr.append('A(' + str(cid) + ') & B(' + str(max_cid_intersect) + ')')
diff_ab_polygs_attr.append('A(' + str(cid) + ') - B(' + str(max_cid_intersect) + ')')
diff_ba_polygs_attr.append('B(' + str(max_cid_intersect) + ') - A(' + str(cid) + ')')
spatial_index_a = clusters_a.sindex
old_polys = []
old_poly_attr = []
for index, row in clusters_b.iterrows():
poly = row['geometry']
cid = row['cluster_id']
possible_matches_index = list(spatial_index_a.intersection(poly.bounds))
possible_matches = clusters_a.iloc[possible_matches_index]
max_area = 0.0
for index_2, row_2 in possible_matches.iterrows():
poly_2 = row_2['geometry']
intersect_polyg = poly.intersection(poly_2)
area = intersect_polyg.area
if area > max_area:
max_area = area
if max_area == 0.0:
old_polys.append(poly)
old_poly_attr.append('B (' + str(cid) + ')')
intersection_gdf = GeoDataFrame(list(zip(intersection_polygons, intersection_polygons_attr)),
columns=['geometry', 'diff'], crs=clusters_a.crs)
diff_ab_gdf = GeoDataFrame(list(zip(diff_ab_polygs + new_polygs, diff_ab_polygs_attr + new_polygs_attr)),
columns=['geometry', 'diff'], crs=clusters_a.crs)
diff_ba_gdf = GeoDataFrame(list(zip(diff_ba_polygs + old_polys, diff_ba_polygs_attr + old_poly_attr)),
columns=['geometry', 'diff'], crs=clusters_a.crs)
# Filter out erroneous rows
intersection_gdf = intersection_gdf[(intersection_gdf.geometry.area > 0.0)]
diff_ab_gdf = diff_ab_gdf[(diff_ab_gdf.geometry.area > 0.0)]
diff_ba_gdf = diff_ba_gdf[(diff_ba_gdf.geometry.area > 0.0)]
# Add colors
intersection_style = {'fillColor': intersection_color, 'weight': 2, 'color': 'black', 'fillOpacity': 0.8}
diff_ab_style = {'fillColor': diff_ab_color, 'weight': 2, 'color': 'black', 'fillOpacity': 0.8}
diff_ba_style = {'fillColor': diff_ba_color, 'weight': 2, 'color': 'black', 'fillOpacity': 0.8}
intersection_gdf['style'] = intersection_gdf.apply(lambda x: intersection_style, axis=1)
diff_ab_gdf['style'] = diff_ab_gdf.apply(lambda x: diff_ab_style, axis=1)
diff_ba_gdf['style'] = diff_ba_gdf.apply(lambda x: diff_ba_style, axis=1)
# Concatenate results
all_gdf = concat([diff_ab_gdf, diff_ba_gdf, intersection_gdf], ignore_index=True, sort=False)
# Generate map
m = map_geometries(all_gdf, tiles=tiles, width=width, height=height)
return m
[docs]def map_cluster_contents_osm(cluster_borders, tiles='OpenStreetMap', width='100%', height='100%'):
"""Constructs a Folium Map displaying the streets and buildings, retreived from OpenStreetMap via OSMNX, within
a given AOI.
Args:
cluster_borders (GeoDataFrame): The cluster polygons.
tiles (string): The tiles to use for the map (default: `OpenStreetMap`).
width (integer or percentage): Width of the map in pixels or percentage (default: 100%).
height (integer or percentage): Height of the map in pixels or percentage (default: 100%).
Returns:
A Folium Map object displaying the retreived entities.
"""
def is_nan(num):
return num != num
# set the crs to WGS84
if cluster_borders.crs['init'] != '4326':
cluster_borders = cluster_borders.to_crs({'init': 'epsg:4326'})
empty_df = DataFrame(columns=['geometry', 'cluster_id', 'type', 'info'])
final_gdf = GeoDataFrame(empty_df, crs=cluster_borders.crs, geometry='geometry')
for index, row in cluster_borders.iterrows():
poly = row['geometry']
cluster_id = row['cluster_id']
osmnx_gdf = osmnx.footprints.create_footprints_gdf(polygon=poly, footprint_type='building',
retain_invalid=False)
remaining_cols = []
columns = list(osmnx_gdf)
if 'amenity' in columns:
remaining_cols.append('amenity')
if 'building' in columns:
remaining_cols.append('building')
remaining_cols.append('geometry')
osmnx_gdf = osmnx_gdf[remaining_cols]
osmnx_gdf['cluster_id'] = cluster_id
col_names_2 = list(osmnx_gdf)
info_list = []
for index2, row2 in osmnx_gdf.iterrows():
info = ''
if 'building' in col_names_2:
building = row2['building']
if not is_nan(building):
if building != 'yes':
info += building + ', '
if 'amenity' in col_names_2:
amenity = row2['amenity']
if not is_nan(amenity):
info += amenity
info_list.append(info)
del remaining_cols[-1]
osmnx_gdf['type'] = 'building'
osmnx_gdf['info'] = info_list
osmnx_gdf.drop(columns=remaining_cols, inplace=True)
oxg = osmnx.core.graph_from_polygon(poly, network_type='all_private', infrastructure='way["highway"]')
gdf_edges = osmnx.save_load.graph_to_gdfs(oxg, nodes=False, edges=True, node_geometry=True,
fill_edge_geometry=True)
columns_3 = list(gdf_edges)
remaining_cols_3 = []
if 'highway' in columns_3:
remaining_cols_3.append('highway')
if 'name' in columns_3:
remaining_cols_3.append('name')
if 'oneway' in columns_3:
remaining_cols_3.append('oneway')
remaining_cols_3.append('geometry')
gdf_edges = gdf_edges[remaining_cols_3]
gdf_edges['cluster_id'] = cluster_id
info_edge_list = []
for index2, row3 in gdf_edges.iterrows():
info = ''
if 'name' in columns_3:
name = row3['name']
if not is_nan(name):
if isinstance(name, list):
info += ','.join(name)
else:
info += name + ', '
if 'highway' in columns_3:
highway = row3['highway']
if not is_nan(highway):
if isinstance(highway, list):
info += ','.join(highway)
else:
info += highway + ', '
if 'oneway' in columns_3:
oneway = row3['oneway']
if not is_nan(oneway):
info += 'oneway'
info_edge_list.append(info)
del remaining_cols_3[-1]
gdf_edges['type'] = 'Road'
gdf_edges['info'] = info_edge_list
gdf_edges.drop(columns=remaining_cols_3, inplace=True)
res_gdf = concat([osmnx_gdf, gdf_edges], axis=0, join='outer', ignore_index=False,
keys=None, levels=None, names=None, verify_integrity=False, copy=True)
final_gdf = concat([final_gdf, res_gdf], axis=0, join='outer', ignore_index=False,
keys=None, levels=None, names=None, verify_integrity=False, copy=True)
final_gdf.reset_index(inplace=True, drop=True)
m = map_geometries(final_gdf, tiles=tiles, width=width, height=height)
return m