"""Max-p regions algorithm.
Source: Wei, Ran, Sergio J. Rey, and Elijah Knaap (2020) "Efficient
regionalization for spatially explicit neighborhood delineation." International
Journal of Geographical Information Science. Accepted 2020-04-12.
"""
__author__ = ["Ran Wei", "Serge Rey", "Elijah Knaap"]
__email__ = "sjsrey@gmail.com"
from ..BaseClass import BaseSpOptHeuristicSolver
from scipy.spatial.distance import pdist, squareform
from scipy.spatial import KDTree
from scipy.sparse.csgraph import connected_components
import libpysal
import numpy as np
from copy import deepcopy
from .base import infeasible_components, plot_components, modify_components
ITERCONSTRUCT = 999
ITERSA = 10
def maxp(
gdf,
w,
attrs_name,
threshold_name,
threshold,
top_n,
max_iterations_construction=ITERCONSTRUCT,
max_iterations_sa=ITERSA,
verbose=False,
policy="single",
):
"""The max-p-regions involves the aggregation of n areas into an unknown maximum number of
homogeneous regions, while ensuring that each region is contiguous and satisfies a minimum
threshold value imposed on a predefined spatially extensive attribute.
Parameters
----------
gdf : geopandas.GeoDataFrame, required
Geodataframe containing original data
w : libpysal.weights.W, required
Weights object created from given data
attrs_name : list, required
Strings for attribute names to measure similarity (cols of ``geopandas.GeoDataFrame``).
threshold_name : string, requied
The name of the spatial extensive attribute variable.
threshold : {int, float}, required
The threshold value.
top_n : int
Max number of candidate regions for enclave assignment.
max_iterations_construction : int
Max number of iterations for construction phase.
max_iterations_SA: int
Max number of iterations for customized simulated annealing.
verbose : boolean
Set to ``True`` for reporting solution progress/debugging.
Default is ``False``.
policy : str
Defaults to ``single`` to attach infeasible components using a
single linkage between the area in the infeasible component
with the smallest nearest neighbor distance to an area in a
feasible component. ``multiple`` adds joins for each area
in an infeasible component and their nearest neighbor area in a
feasible component. ``keep`` attempts to solve without
modification (useful for debugging). ``drop`` removes areas in
infeasible components before solving.
Returns
-------
max_p : int
The number of regions.
labels : numpy.array
Region IDs for observations.
"""
gdf, w = modify_components(gdf, w, threshold_name, threshold, policy=policy)
attr = np.atleast_2d(gdf[attrs_name].values)
if attr.shape[0] == 1:
attr = attr.T
threshold_array = gdf[threshold_name].values
distance_matrix = squareform(pdist(attr, metric="cityblock"))
n, k = attr.shape
arr = np.arange(n)
max_p, rl_list = construction_phase(
arr,
attr,
threshold_array,
distance_matrix,
w,
threshold,
top_n,
max_iterations_construction,
)
if verbose:
print("max_p: ", max_p)
print("number of good partitions:", len(rl_list))
alpha = 0.998
tabuLength = 10
max_no_move = n
best_obj_value = np.inf
best_label = None
for irl, rl in enumerate(rl_list):
label, regionList, regionSpatialAttr = rl
if verbose:
print(irl)
for saiter in range(max_iterations_sa):
finalLabel, finalRegionList, finalRegionSpatialAttr = performSA(
label,
regionList,
regionSpatialAttr,
threshold_array,
w,
distance_matrix,
threshold,
alpha,
tabuLength,
max_no_move,
)
totalWithinRegionDistance = calculateWithinRegionDistance(
finalRegionList, distance_matrix
)
if verbose:
print("totalWithinRegionDistance after SA: ")
print(totalWithinRegionDistance)
if totalWithinRegionDistance < best_obj_value:
best_obj_value = totalWithinRegionDistance
best_label = finalLabel
if verbose:
print("best objective value:")
print(best_obj_value)
return max_p, best_label
def construction_phase(
arr,
attr,
threshold_array,
distance_matrix,
weight,
spatialThre,
random_assign_choice,
max_it=999,
):
"""Construct feasible solutions for max-p-regions.
Parameters
----------
arr : array, required
An array of index of area units.
attr : array, required
An array of the values of the attributes.
threshold_array : array, required
An array of the values of the spatial extensive attribute.
distance_matrix : array, required
A square-form distance matrix for the attributes.
weight : libpysal.weights.W, required
Weights object created from given data.
spatialThre : {int, float}, required
The threshold value.
random_assign_choice : int, required
The number of top candidate regions to consider for enclave assignment.
max_it : int
Maximum number of iterations. Default is 999.
Returns
-------
real_values : list
``realmaxpv``, ``realLabelsList``
"""
labels_list = []
pv_list = []
max_p = 0
maxp_labels = None
maxp_regionList = None
maxp_regionSpatialAttr = None
for _ in range(max_it):
labels = [0] * len(threshold_array)
C = 0
regionSpatialAttr = {}
enclave = []
regionList = {}
np.random.shuffle(arr)
labeledID = []
for arr_index in range(0, len(threshold_array)):
P = arr[arr_index]
if not (labels[P] == 0):
continue
NeighborPolys = deepcopy(weight.neighbors[P])
if len(NeighborPolys) == 0:
labels[P] = -1
else:
C += 1
labeledID, spatialAttrTotal = growClusterForPoly(
labels, threshold_array, P, NeighborPolys, C, weight, spatialThre
)
if spatialAttrTotal < spatialThre:
C -= 1
enclave.extend(labeledID)
else:
regionList[C] = labeledID
regionSpatialAttr[C] = spatialAttrTotal
num_regions = len(regionList)
for i, l in enumerate(labels):
if l == -1:
enclave.append(i)
if num_regions < max_p:
continue
else:
max_p = num_regions
maxp_labels, maxp_regionList, maxp_regionSpatialAttr = assignEnclave(
enclave,
labels,
regionList,
regionSpatialAttr,
threshold_array,
weight,
distance_matrix,
random_assign=random_assign_choice,
)
pv_list.append(max_p)
labels_list.append([maxp_labels, maxp_regionList, maxp_regionSpatialAttr])
realLabelsList = []
realmaxpv = max(pv_list)
for ipv, pv in enumerate(pv_list):
if pv == realmaxpv:
realLabelsList.append(labels_list[ipv])
real_values = [realmaxpv, realLabelsList]
return real_values
def growClusterForPoly(
labels, threshold_array, P, NeighborPolys, C, weight, spatialThre
):
"""Grow one region until threshold constraint is satisfied.
Parameters
----------
labels : list, required
A list of current region labels
threshold_array : array, required
An array of the values of the spatial extensive attribute.
P : int, required
The index of current area unit
NeighborPolys : list, required
The neighbors of current area unit
C : int, required
The index of current region
weight : libpysal.weights.W, required
Weights object created from given data
spatialThre : {int, float}, required
The threshold value.
Returns
-------
cluster_info : tuple
``labeledID``, ``spatialAttrTotal``
"""
labels[P] = C
labeledID = [P]
spatialAttrTotal = threshold_array[P]
i = 0
while i < len(NeighborPolys):
if spatialAttrTotal >= spatialThre:
break
Pn = NeighborPolys[i]
if labels[Pn] == 0:
labels[Pn] = C
labeledID.append(Pn)
spatialAttrTotal += threshold_array[Pn]
if spatialAttrTotal < spatialThre:
PnNeighborPolys = weight.neighbors[Pn]
for pnn in PnNeighborPolys:
if pnn not in NeighborPolys:
NeighborPolys.append(pnn)
i += 1
cluster_info = labeledID, spatialAttrTotal
return cluster_info
def assignEnclave(
enclave,
labels,
regionList,
regionSpatialAttr,
threshold_array,
weight,
distance_matrix,
random_assign=1,
):
"""Assign the enclaves to the regions identified in the region growth phase.
Parameters
----------
enclave : list, required
A list of enclaves.
labels : list, required
A list of region labels for area units.
regionList : dict, required
A dictionary with key as region ID and value as a list of area
units assigned to the region.
regionSpatialAttr : dict, required
A dictionary with key as region ID and value as the total
spatial extensive attribute of the region.
threshold_array : array, required
An array of the values of the spatial extensive attribute.
weight : libpysal.weights.W, required
Weights object created from given data
distance_matrix : array, required
A square-form distance matrix for the attributes.
random_assign : int, required
The number of top candidate regions to consider for enclave assignment.
Returns
-------
region_info : list
Deep copies of ``labels``, ``regionList``, and ``regionSpatialAttr``
"""
enclave_index = 0
while len(enclave) > 0:
ec = enclave[enclave_index]
ecNeighbors = weight.neighbors[ec]
assignedRegion = 0
ecNeighborsList = []
for ecn in ecNeighbors:
if ecn in enclave:
continue
rm = np.array(regionList[labels[ecn]])
totalDistance = distance_matrix[ec, rm].sum()
ecNeighborsList.append((ecn, totalDistance))
ecNeighborsList = sorted(ecNeighborsList, key=lambda tup: tup[1])
top_num = min([len(ecNeighborsList), random_assign])
if top_num > 0:
ecn_index = np.random.randint(top_num)
assignedRegion = labels[ecNeighborsList[ecn_index][0]]
if assignedRegion == 0:
enclave_index += 1
else:
labels[ec] = assignedRegion
regionList[assignedRegion].append(ec)
regionSpatialAttr[assignedRegion] += threshold_array[ec]
del enclave[enclave_index]
enclave_index = 0
region_info = [deepcopy(labels), deepcopy(regionList), deepcopy(regionSpatialAttr)]
return region_info
def calculateWithinRegionDistance(regionList, distance_matrix):
"""Calculate total wthin-region distance/dissimilarity.
Parameters
----------
regionList : dict, required
A dictionary with key as region ID and value as a list of area
units assigned to the region.
distance_matrix : array, required
A square-form distance matrix for the attributes.
Returns
-------
totalWithinRegionDistance : {int, float}
the total within-region distance
"""
totalWithinRegionDistance = 0
for k, v in regionList.items():
nv = np.array(v)
regionDistance = distance_matrix[nv, :][:, nv].sum() / 2
totalWithinRegionDistance += regionDistance
return totalWithinRegionDistance
def pickMoveArea(
labels,
regionLists,
regionSpatialAttrs,
threshold_array,
weight,
distance_matrix,
threshold,
):
"""Pick a spatial unit that can move from one region to another.
Parameters
----------
labels : list, required
A list of current region labels
regionLists : dict, required
A dictionary with key as region ID and value as a list of area
units assigned to the region.
regionSpatialAttrs : dict, required
A dictionary with key as region ID and value as the total
spatial extensive attribute of the region.
threshold_array : array, required
An array of the values of the spatial extensive attribute.
weight :libpysal.weights.W, required
Weights object created from given data
threshold : {int, float}, required
The threshold value.
Returns
-------
potentialAreas : list
a list of area units that can move without violating
contiguity and threshold constraints
"""
potentialAreas = []
for k, v in regionSpatialAttrs.items():
rla = np.array(regionLists[k])
rasa = threshold_array[rla]
lostSA = v - rasa
pas_indices = np.where(lostSA > threshold)[0]
if pas_indices.size > 0:
for pasi in pas_indices:
leftAreas = np.delete(rla, pasi)
ws = weight.sparse
cc = connected_components(ws[leftAreas, :][:, leftAreas])
if cc[0] == 1:
potentialAreas.append(rla[pasi])
else:
continue
return potentialAreas
def checkMove(
poa, labels, regionLists, threshold_array, weight, distance_matrix, threshold
):
"""Calculate the dissimilarity increase/decrease from one potential move.
Parameters
----------
poa : int, required
The index of current area unit that can potentially move
labels : list, required
A list of current region labels
regionLists : dict, required
A dictionary with key as region ID and value as a list of area
units assigned to the region.
threshold_array : array, required
An array of the values of the spatial extensive attribute.
weight : libpysal.weights.W, required
Weights object created from given data
distance_matrix : array, required
A square-form distance matrix for the attributes.
threshold : {int, float}, required
The threshold value.
Returns
-------
move_info : list
``lostDistance``, ``minAddedDistance``, and ``potentialMove``.
"""
poaNeighbor = weight.neighbors[poa]
donorRegion = labels[poa]
rm = np.array(regionLists[donorRegion])
lostDistance = distance_matrix[poa, rm].sum()
potentialMove = None
minAddedDistance = np.Inf
for poan in poaNeighbor:
recipientRegion = labels[poan]
if donorRegion != recipientRegion:
rm = np.array(regionLists[recipientRegion])
addedDistance = distance_matrix[poa, rm].sum()
if addedDistance < minAddedDistance:
minAddedDistance = addedDistance
potentialMove = (poa, donorRegion, recipientRegion)
move_info = [lostDistance, minAddedDistance, potentialMove]
return move_info
def performSA(
initLabels,
initRegionList,
initRegionSpatialAttr,
threshold_array,
weight,
distance_matrix,
threshold,
alpha,
tabuLength,
max_no_move,
):
"""Perform the tabu list integrated simulated annealing algorithm.
Parameters
----------
initLabels : list, required
A list of initial region labels before SA
initRegionList : dict, required
A dictionary with key as region ID and value as a list of area
units assigned to the region before SA.
initRegionSpatialAttr : dict, required
A dictionary with key as region ID and value as the total
spatial extensive attribute of the region before SA.
threshold_array : array, required
An array of the values of the spatial extensive attribute.
weight : libpysal.weights.W, required
Weights object created from given data.
distance_matrix : array, required
A square-form distance matrix for the attributes.
threshold : {int, float}, required
The threshold value.
alpha : float between 0 and 1, required
Temperature cooling rate
tabuLength : int, required
Max length of a tabuList
max_no_move : int, required
Max number of none improving movements
Returns
-------
sa_res : list
The results from simulated annealing including ``labels``,
``regionLists``, and ``regionSpatialAttrs``.
"""
t = 1
ni_move_ct = 0
make_move_flag = False
tabuList = []
potentialAreas = []
labels = deepcopy(initLabels)
regionLists = deepcopy(initRegionList)
regionSpatialAttrs = deepcopy(initRegionSpatialAttr)
while ni_move_ct <= max_no_move:
if len(potentialAreas) == 0:
potentialAreas = pickMoveArea(
labels,
regionLists,
regionSpatialAttrs,
threshold_array,
weight,
distance_matrix,
threshold,
)
if len(potentialAreas) == 0:
break
poa = potentialAreas[np.random.randint(len(potentialAreas))]
lostDistance, minAddedDistance, potentialMove = checkMove(
poa,
labels,
regionLists,
threshold_array,
weight,
distance_matrix,
threshold,
)
if potentialMove is None:
potentialAreas.remove(poa)
continue
diff = lostDistance - minAddedDistance
donorRegion = potentialMove[1]
recipientRegion = potentialMove[2]
if diff > 0:
make_move_flag = True
if (poa, recipientRegion, donorRegion) not in tabuList:
if len(tabuList) == tabuLength:
tabuList.pop(0)
tabuList.append((poa, recipientRegion, donorRegion))
ni_move_ct = 0
else:
ni_move_ct += 1
prob = np.exp(diff / t)
if prob > np.random.random() and potentialMove not in tabuList:
make_move_flag = True
else:
make_move_flag = False
potentialAreas.remove(poa)
if make_move_flag:
labels[poa] = recipientRegion
regionLists[donorRegion].remove(poa)
regionLists[recipientRegion].append(poa)
regionSpatialAttrs[donorRegion] -= threshold_array[poa]
regionSpatialAttrs[recipientRegion] += threshold_array[poa]
impactedAreas = []
for pa in potentialAreas:
if labels[pa] == recipientRegion or labels[pa] == donorRegion:
impactedAreas.append(pa)
for pa in impactedAreas:
potentialAreas.remove(pa)
t = t * alpha
return [labels, regionLists, regionSpatialAttrs]
[docs]class MaxPHeuristic(BaseSpOptHeuristicSolver):
"""The max-p-regions involves the aggregation of n areas into an
unknown maximum number of homogeneous regions, while ensuring that
each region is contiguious and satisfies a minimum threshold value
imposed on a predefined spatially extensive attribute.
Parameters
----------
gdf : geopandas.GeoDataFrame, required
Geodataframe containing original data.
w : libpysal.weights.W, required
Weights object created from given data.
attrs_name : list, required
Strings for attribute names (cols of ``geopandas.GeoDataFrame``).
threshold_name : string, required
The name of the spatial extensive attribute variable.
threshold : {int, float}, required
The threshold value.
top_n : int, required
The number of top candidate regions to consider for enclave assignment.
max_iterations_construction : int
Max number of iterations for construction phase.
max_iterations_SA : int
Max number of iterations for customized simulated annealing.
verbose : boolean
Set to ``True`` for reporting solution progress/debugging.
Default is ``False``.
policy : str
Defaults to ``'single'`` to attach infeasible components using a
single linkage between the area in the infeasible component
with the smallest nearest neighbor distance to an area in a
feasible component. ``'multiple'`` adds joins for each area
in an infeasible component and their nearest neighbor area in a
feasible component. ``'keep'`` attempts to solve without
modification (useful for debugging). ``'drop'`` removes areas in
infeasible components before solving.
Attributes
----------
max_p : int
The number of regions.
labels_ : numpy.array
Region IDs for observations.
Examples
--------
>>> import numpy
>>> import libpysal
>>> import geopandas as gpd
>>> from spopt.region.maxp import MaxPHeuristic
Read the data.
>>> pth = libpysal.examples.get_path("mexicojoin.shp")
>>> mexico = gpd.read_file(pth)
>>> mexico["count"] = 1
Create the weight.
>>> w = libpysal.weights.Queen.from_dataframe(mexico)
Define the columns of ``geopandas.GeoDataFrame`` to be spatially
extensive attribute.
>>> attrs_name = [f"PCGDP{year}" for year in range(1950, 2010, 10)]
Define the spatial extensive attribute variable and the threshold value.
>>> threshold_name = "count"
>>> threshold = 4
Run the max-p-regions algorithm.
>>> model = MaxPHeuristic(mexico, w, attrs_name, threshold_name, threshold)
>>> model.solve()
Get the number of regions and region IDs for unit areas.
>>> model.p
>>> model.labels_
"""
[docs] def __init__(
self,
gdf,
w,
attrs_name,
threshold_name,
threshold,
top_n=2,
max_iterations_construction=99,
max_iterations_sa=ITERSA,
verbose=False,
policy="single",
):
self.gdf = gdf
self.w = w
self.attrs_name = attrs_name
self.threshold_name = threshold_name
self.threshold = threshold
self.top_n = top_n
self.max_iterations_construction = max_iterations_construction
self.max_iterations_sa = max_iterations_sa
self.verbose = verbose
self.policy = policy
[docs] def solve(self):
"""Solve a max-p-regions problem and get back the results."""
max_p, label = maxp(
self.gdf,
self.w,
self.attrs_name,
self.threshold_name,
self.threshold,
self.top_n,
self.max_iterations_construction,
self.max_iterations_sa,
verbose=self.verbose,
policy=self.policy,
)
self.labels_ = label
self.p = max_p