Parallel computation of roads disruption over large areas#
Considering the potential large-scale implications that diruptions, most prominently floods, can cause to roads infrastructure, it is important to test the computational requirements needed to perform the accessibility analysis.
While the Raster-based computation is relatively light in terms of time requirements, Vector-based analysis are more computationally intensive due to the modelling of single roads segments that make up the network over which acessibility is assessed.
In the following is provided an example of the code and the libraries needed to perform a flood disruption task over Pakistan, which cannot be accomplished using the jupyter-notebooks used up to now.
Python code#
A standard Python code is needed to perform parallel computation, achieved using:
Dask Geopandas is used to import and store as Geopandas DataFrame the roads network shapefile, whereas Multiprocessing is used to parallelize the intersection operation between the road network and the flood hazard map. This operation is performed by splitting the road network in chunks (blocks) composed of a user-defined number of roads (100.000 in the code below) whick are automatically processed in parallel across the available CPUs of the machine.
The Python code containing the parallelization is pak_vector_parallel_rtree_opt.py, and its content reported below:
#################### ============================================================ ################################
## IMPORT PACKAGES AND LIBRARIES
# System
import sys
import os
from os.path import join, expanduser
from pathlib import Path
# Avoid warnings to pop up
import warnings
warnings.filterwarnings('ignore')
from rasterio.plot import plotting_extent
from rasterio.plot import show
from mpl_toolkits.axes_grid1 import make_axes_locatable
import contextily as ctx
# Processing
import numpy as np
import geopandas as gpd
import pandas as pd
# Raster
import rasterio as rio
from rasterio.features import shapes
from shapely.geometry import box
from rasterio.features import geometry_mask
from shapely.geometry import Polygon, box, Point
from shapely.geometry import mapping
# Parallelization
import multiprocessing
import dask
import dask_geopandas as dask_gpd
import rioxarray as rioxr
# Define your path to the Repositories
sys.path.append(join(expanduser("/home/jupyter-wb618081"), 'Repos', 'gostrocks', 'src'))
sys.path.append(join(expanduser("/home/jupyter-wb618081"), 'Repos', 'GOSTNets_Raster', 'src'))
sys.path.append(join(expanduser("/home/jupyter-wb618081"), 'Repos', 'GOSTnets'))
sys.path.append(join(expanduser("/home/jupyter-wb618081"), 'Repos', 'GOST_Urban', 'src', 'GOST_Urban'))
sys.path.append(join(expanduser("/home/jupyter-wb618081"), 'Repos', 'health-equity-diagnostics', 'src', 'modules'))
sys.path.append(join(expanduser("/home/jupyter-wb618081"), 'Repos', 'INFRA_SAP'))
import GOSTnets as gn
from GOSTnets.load_osm import *
import GOSTRocks.rasterMisc as rMisc
from GOSTRocks.misc import get_utm
import GOSTNetsRaster.market_access as ma
import UrbanRaster as urban
from infrasap import aggregator
from infrasap import osm_extractor as osm
from utils import download_osm_shapefiles
#################### ============================================================ ################################
## IMPORT DATA
data_dir = join(expanduser("/home/jupyter-wb618081"), 'data')
scratch_dir = join(expanduser("/home/jupyter-wb618081"), 'Health-Access-Metrics')
out_path = join(expanduser("/home/jupyter-wb618081"), 'Health-Access-Metrics', 'Output')
epsg = "EPSG:4326"
epsg_utm = "EPSG:32736"
iso = 'PAK'
## Import FLOOD
# Import multiple rasterio .tif file as a dictionary
# Keys are return periods
# Values are rasterio arrays
# inland waters and oceans: 999
# not-flooded areas: -999 (Fluvial)
# not-flooded areas: 0 (Pluvial)
# Other values represent the flood depth (in m)
flood_fluvial_path = join(data_dir, iso,'FLOOD_SSBN','fluvial_undefended')
flood_pluvial_path = join(data_dir, iso,'FLOOD_SSBN','pluvial')
files=os.listdir(flood_fluvial_path)
flood_dict_fluvial = {}
for file in files:
key = file.split('_')[1].split('.')[0]
value = rio.open(join(flood_fluvial_path,file)) #.read(1)
flood_dict_fluvial[key] = value
files=os.listdir(flood_pluvial_path)
flood_dict_pluvial = {}
for file in files:
key = file.split('_')[1].split('.')[0]
value = rio.open(join(flood_pluvial_path,file)) #.read(1)
flood_dict_pluvial[key] = value
# Preserve the maximum flood depth
flood_dict = {}
for f,key in enumerate(flood_dict_pluvial.keys()):
out_flood_path = join(data_dir, iso,'FLOOD_SSBN', 'Fmax_' + key +'.tif')
if os.path.isfile(out_flood_path):
value = rioxr.open_rasterio(out_flood_path, chunks = 1000)
# value = rio.open(out_flood_path)
flood_dict[key] = value
else:
out_meta = flood_dict_pluvial[key].meta
flood_max = np.fmax(flood_dict_fluvial[key].read(1),flood_dict_pluvial[key].read(1))
flood_dict[key] = flood_max
# flood_dict[key][flood_dict[key] == 0] = -999
# Write the output raster
out_flood_path = join(data_dir, iso,'FLOOD_SSBN', 'Fmax_' + key +'.tif')
with rio.open(out_flood_path, 'w', **out_meta) as dst:
dst.write(flood_max, 1)
# Read the output raster
# rioxarray
# value = rioxr.open_rasterio(out_flood_path, chunks = 100)
# rasterio
value = rio.open(out_flood_path)
flood_dict[key] = value
# Free up memory
for f,key in enumerate(flood_dict.keys()):
del flood_dict_fluvial[key]
del flood_dict_pluvial[key]
flood_dict[key]
## Import ROADS
# Load the Road network
roads = dask_gpd.read_file(join(data_dir, iso, "pakistan-latest-free.shp", 'gis_osm_roads_free_1.shp'), npartitions = 8)
roads = roads.to_crs(epsg)
def get_num(x):
try:
return(int(x))
except:
return(5)
# roads.rename(columns = {"fclass":"OSMLR_num"}, inplace = True)
roads['OSMLR_num'] = roads['fclass'].apply(lambda x: get_num(str(x)[-1]))
# Floods can impact all the roads, except the bridges in primary and secondary roads
safe = ((roads['bridge'] == "T") & ((roads['OSMLR_num'] == 1) | (roads['OSMLR_num'] == 2)))
roads_safe = roads[safe][["osm_id", "fclass","bridge","geometry","OSMLR_num"]]
roads_flood = roads[~safe][["osm_id", "fclass","bridge","geometry","OSMLR_num"]]
roads_safe = roads_safe.compute()
roads_flood = roads_flood.compute()
#################### ============================================================ ################################
## PARALLEL COMPUTATION OF ROADS/FLOOD DISRUPTION
import psutil
import multiprocessing
def raster_to_vect(mask, transform):
polygons = []
for index, value in np.ndenumerate(mask):
row, col = index
if value:
top_left_x, top_left_y = transform * (col, row)
bottom_right_x, bottom_right_y = transform * (col + 1, row + 1)
polygon = box(top_left_x, bottom_right_y, bottom_right_x, top_left_y)
polygons.append(polygon)
return polygons
# Function to perform intersection
def rtree_intersect(roads_flood_gdf, flood_gdf):
intersection_gdf = gpd.GeoDataFrame(columns=roads_flood.columns, crs=roads_flood.crs)
flood_gdf = flood_gdf.unary_union
flood_gdf = ox.utils_geo._quadrat_cut_geometry(flood_gdf, 0.5)
for poly in flood_gdf.geoms:
spatial_index = roads_flood_gdf.sindex
possible_matches_index = list(spatial_index.intersection(poly.bounds))
possible_matches = roads_flood_gdf.iloc[possible_matches_index]
precise_matches = possible_matches[possible_matches.intersects(poly)]
intersection_gdf = gpd.GeoDataFrame(pd.concat([intersection_gdf, precise_matches], ignore_index=True), crs=roads_flood.crs)
intersection_gdf.drop_duplicates(subset="osm_id", inplace=True)
return intersection_gdf
# Function to perform parallel intersections
def parallel_intersects(roads_flood_gdf, flood_gdf, chunk_size):
chunks = []
rows, _ = flood_gdf.shape
for r in range(0, rows, chunk_size):
chunk_flood_poly = flood_gdf.iloc[r:r+chunk_size]
chunks.append(chunk_flood_poly)
with multiprocessing.Pool() as pool:
results = pool.starmap(rtree_intersect, zip([roads_flood_gdf]*len(chunks), chunks))
matches = gpd.GeoDataFrame(pd.concat(results, ignore_index=False), crs=roads_flood_gdf.crs)
matches.crs = epsg
return matches
# Main code
if __name__ == "__main__":
cpun = multiprocessing.cpu_count()
print("Number of CPU:", cpun)
cpu_percent = psutil.cpu_percent()
print("CPU Usage:", cpu_percent)
memory_info = psutil.virtual_memory()
print("Memory Total: GB", memory_info.total/(10**9))
print("Memory Used: GB", memory_info.used/(10**9))
print("Memory Free: GB", memory_info.free/(10**9))
for scen in list(flood_dict.keys()):
start_time = time.time()
print("VECTORIZATION started...")
## VECTORIZATION CODE
# Rioxarray (chunks = 1000)
flood_road = flood_dict[scen].copy()
# If water level is more than 20 cm, the road is disrupted
transf = flood_road.rio.transform()
mask = (flood_road >= 0.2).isel(band = 0)
# Vectorize the masked cells
flood_poly = raster_to_vect(mask, transf)
flood_poly_gdf = gpd.GeoDataFrame(geometry=flood_poly, crs=epsg) # Make sure to set the correct CRS
end_time = time.time()
duration = end_time - start_time
print("VECTORIZATION Duration:", duration/60, "minutes")
start_time = time.time()
print("INTERSECTION started...")
## INTERSECTION CODE
matches = parallel_intersects(roads_flood, flood_poly_gdf, chunk_size=100000) #, chunk_size=75000
end_time = time.time()
duration = end_time - start_time
print("INTERSECTION Duration:", duration/60, "minutes")
print("SAVING Roads disrupted", scen)
# Save results
file = join(out_path,iso,"roads_impact_"+scen+"_new.shp")
roads_flood_impact = roads_flood.drop(matches.index)
roads_flood_impact = pd.concat([roads_flood_impact, roads_safe], axis = 0)
matches.to_file(file, index = False)
# Free up memory
del flood_road
del flood_poly
del flood_poly_gdf
del matches
del roads_flood_impact
Bash script#
The operation can be computed by running the parallel.sh Bash script as below:
#!/bin/bash
# Activate your Python virtual environment if needed
source activate pam_tool
# Define Python script file path
PYTHON_SCRIPT="$$USER_PATH$$/Health-Access-Metrics/notebooks/pak_vector_parallel_rtree_opt.py"
# Execute the Python script
time python "$PYTHON_SCRIPT"
conda deactivate
This is also embedded into the HF_Access_flood_PAK_vector.ipynb notebook for its integration with the rest of the analysis:
! ./parallel.sh