If you don't know what is Schelling's model of segregation, you can read it here.
The Schelling model of segregation is an agent-based model that illustrates how individual tendencies regarding neighbors can lead to segregation. In the Schelling model, agents occupy cells of rectangular space. A cell can be occupied by a single agent only. Agents belong to one of two groups and are able to relocate according to the fraction of friends (i.e., agents of their own group) within a neighborhood around their location. The model's basic assumption is as follows: an agent, located in the center of a neighborhood where the fraction of friends f is less than a predefined tolerance threshold F (i.e., f < F), will try to relocate to a neighborhood for which the fraction of friends is at least f (i.e., f ≥ F)
I have written the following code to run the Schelling's model of segregation simulation.
import numpy as np
from shapely.geometry import Point
import geopandas as gp
from matplotlib import pyplot as plt
import shapely
import random
import itertools
import copy
import matplotlib.animation
import pandas as pd
class geo_schelling(object):
def __init__(self,shapefile,spacing,empty_ratio,similarity_threshhold,n_iterations,ratio,races=2):
self.shapefile=shapefile
self.spacing=spacing
self.empty_ratio=empty_ratio
self.similarity_threshhold=similarity_threshhold
self.n_iterations=n_iterations
self.ratio=ratio
self.races=races
self.shape_cali=gp.read_file(shapefile)
def generate_grid_in_polygon(self,spacing, polygon):
''' This Function generates evenly spaced points within the given
GeoDataFrame. The parameter 'spacing' defines the distance between
the points in coordinate units. '''
# Get the bounds of the polygon
minx, miny, maxx, maxy = polygon.bounds
# Now generate the entire grid
x_coords = list(np.arange(np.floor(minx), int(np.ceil(maxx)), spacing))
y_coords = list(np.arange(np.floor(miny), int(np.ceil(maxy)), spacing))
grid = [Point(x) for x in zip(np.meshgrid(x_coords, y_coords)[0].flatten(), np.meshgrid(x_coords, y_coords)[1].flatten())]
# Finally only keep the points within the polygon
list_of_points = [point for point in grid if point.within(polygon)]
return list(zip([point.x for point in list_of_points],[point.y for point in list_of_points]))
def populate(self):
self.all_counties=self.shape_cali.geometry
self.empty_houses=[]
self.agents={}
self.all_houses=[]
for county in self.all_counties:
if type(county)==shapely.geometry.multipolygon.MultiPolygon:
for j in county:
self.all_houses.extend(self.generate_grid_in_polygon(self.spacing,j))
else:
self.all_houses.extend(self.generate_grid_in_polygon(self.spacing,county))
random.shuffle(self.all_houses)
self.n_empty=int(self.empty_ratio*len(self.all_houses))
self.empty_houses=self.all_houses[:self.n_empty]
self.remaining_houses=self.all_houses[self.n_empty:]
divider=int(round(len(self.remaining_houses)*self.ratio))
houses_majority=self.remaining_houses[:divider]
houses_minority=self.remaining_houses[divider:]
self.agents.update(dict(zip(houses_majority,[1]*divider)))
self.agents.update(dict(zip(houses_minority,[2]*int(len(self.remaining_houses)-divider))))
return self.agents,self.empty_houses,len(self.all_houses)
def plot(self):
fig, ax = plt.subplots(figsize=(15,15))
agent_colors = {1:'b', 2:'r'}
for agent,county in itertools.zip_longest(self.agents,self.all_counties):
#ax.scatter(self.agent[0], self.agent[1], color=agent_colors[agents[agent]])
if type(county)==shapely.geometry.multipolygon.MultiPolygon:
for j in county:
x,y=j.exterior.xy
ax.plot(x,y)
elif county is None:
pass
else:
x,y=county.exterior.xy
ax.plot(x,y)
ax.scatter(agent[0], agent[1], color=agent_colors[self.agents[agent]])
ax.set_title("Simulation", fontsize=10, fontweight='bold')
ax.set_xticks([])
ax.set_yticks([])
def is_unsatisfied(self, x, y):
"""
Checking if an agent is unsatisfied or satisified at its current
position.
"""
race = self.agents[(x,y)]
count_similar = 0
count_different = 0
min_width=min(np.array(self.all_houses)[:,0])
max_width=max(np.array(self.all_houses)[:,0])
min_height=min(np.array(self.all_houses)[:,1])
max_height=max(np.array(self.all_houses)[:,1])
if x > min_width and y > min_height and (x-self.spacing, y-self.spacing) not in self.empty_houses:
if (x-self.spacing, y-self.spacing) in self.agents:
if self.agents[(x-self.spacing, y-self.spacing)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if y > min_height and (x,y-self.spacing) not in self.empty_houses:
if (x,y-self.spacing) in self.agents:
if self.agents[(x,y-self.spacing)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if x < (max_width-self.spacing) and y > min_height and (x+self.spacing,y-self.spacing) not in self.empty_houses:
if (x+self.spacing,y-self.spacing) in self.agents:
if self.agents[(x+self.spacing,y-self.spacing)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if x > min_width and (x-self.spacing,y) not in self.empty_houses:
if (x-self.spacing,y) in self.agents:
if self.agents[(x-self.spacing,y)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if x < (max_width-self.spacing) and (x+self.spacing,y) not in self.empty_houses:
if (x+self.spacing,y) in self.agents:
if self.agents[(x+self.spacing,y)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if x > min_width and y < (max_height-self.spacing) and (x-self.spacing,y+self.spacing) not in self.empty_houses:
if (x-self.spacing,y+self.spacing) in self.agents:
if self.agents[(x-self.spacing,y+self.spacing)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if x > min_width and y < (max_height-self.spacing) and (x,y+self.spacing) not in self.empty_houses:
if (x,y+self.spacing) in self.agents:
if self.agents[(x,y+self.spacing)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if x < (max_width-self.spacing) and y < (max_height-self.spacing) and (x+self.spacing,y+self.spacing) not in self.empty_houses:
if (x+self.spacing,y+self.spacing) in self.agents:
if self.agents[(x+self.spacing,y+self.spacing)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if (count_similar+count_different) == 0:
return False
else:
return float(count_similar)/(count_similar+count_different) < self.similarity_threshhold
def move_to_empty(self,x,y):
race = self.agents[(x,y)]
empty_house = random.choice(self.empty_houses)
self.updated_agents[empty_house] = race
del self.updated_agents[(x, y)]
self.empty_houses.remove(empty_house)
self.empty_houses.append((x, y))
def update_animate(self):
"""
Update the square on the basis of similarity threshhold. This is the
function which actually runs the simulation.
"""
fig, ax = plt.subplots(figsize=(15,15))
agent_colors = {1:'b', 2:'r'}
ax.set_xticks([])
ax.set_yticks([])
def update(i):
self.old_agents = copy.deepcopy(self.agents)
n_changes = 0
for agent,county in itertools.zip_longest(self.old_agents,self.all_counties):
#ax.scatter(self.agent[0], self.agent[1], color=agent_colors[agents[agent]])
if type(county)==shapely.geometry.multipolygon.MultiPolygon:
for j in county:
x,y=j.exterior.xy
ax.plot(x,y)
elif county is None:
pass
else:
x,y=county.exterior.xy
ax.plot(x,y)
ax.scatter(agent[0], agent[1], color=agent_colors[self.agents[agent]])
ax.set_title('Simulation', fontsize=10, fontweight='bold')
if self.is_unsatisfied(agent[0], agent[1]):
agent_race = self.agents[agent]
empty_house = random.choice(self.empty_houses)
self.agents[empty_house] = agent_race
del self.agents[agent]
self.empty_houses.remove(empty_house)
self.empty_houses.append(agent)
n_changes += 1
if n_changes==0:
return
ani = matplotlib.animation.FuncAnimation(fig, update, frames= self.n_iterations,repeat=False)
plt.show()
def update_normal(self):
"""
This function is the normal version of the update and doesn't include
any animation whatsoever as it is in the case of the update_animate
function.
"""
for i in range(self.n_iterations):
self.old_agents = copy.deepcopy(self.agents)
n_changes = 0
for agent in self.old_agents:
if self.is_unsatisfied(agent[0], agent[1]):
agent_race = self.agents[agent]
empty_house = random.choice(self.empty_houses)
self.agents[empty_house] = agent_race
del self.agents[agent]
self.empty_houses.remove(empty_house)
self.empty_houses.append(agent)
n_changes += 1
print(n_changes)
print(i)
if n_changes == 0:
break
def calculate_similarity(self):
"""
Checking if an agent is unsatisfied or satisified at its current
position.
"""
similarity = []
min_width=min(np.array(self.all_houses)[:,0])
max_width=max(np.array(self.all_houses)[:,0])
min_height=min(np.array(self.all_houses)[:,1])
max_height=max(np.array(self.all_houses)[:,1])
for agent in self.agents:
count_similar = 0
count_different = 0
x = agent[0]
y = agent[1]
race = self.agents[(x,y)]
if x > min_width and y > min_height and (x-self.spacing, y-self.spacing) not in self.empty_houses:
if (x-self.spacing, y-self.spacing) in self.agents:
if self.agents[(x-self.spacing, y-self.spacing)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if y > min_height and (x,y-self.spacing) not in self.empty_houses:
if (x,y-self.spacing) in self.agents:
if self.agents[(x,y-self.spacing)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if x < (max_width-self.spacing) and y > min_height and (x+self.spacing,y-self.spacing) not in self.empty_houses:
if (x+self.spacing,y-self.spacing) in self.agents:
if self.agents[(x+self.spacing,y-self.spacing)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if x > min_width and (x-self.spacing,y) not in self.empty_houses:
if (x-self.spacing,y) in self.agents:
if self.agents[(x-self.spacing,y)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if x < (max_width-self.spacing) and (x+self.spacing,y) not in self.empty_houses:
if (x+self.spacing,y) in self.agents:
if self.agents[(x+self.spacing,y)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if x > min_width and y < (max_height-self.spacing) and (x-self.spacing,y+self.spacing) not in self.empty_houses:
if (x-self.spacing,y+self.spacing) in self.agents:
if self.agents[(x-self.spacing,y+self.spacing)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if x > min_width and y < (max_height-self.spacing) and (x,y+self.spacing) not in self.empty_houses:
if (x,y+self.spacing) in self.agents:
if self.agents[(x,y+self.spacing)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if x < (max_width-self.spacing) and y < (max_height-self.spacing) and (x+self.spacing,y+self.spacing) not in self.empty_houses:
if (x+self.spacing,y+self.spacing) in self.agents:
if self.agents[(x+self.spacing,y+self.spacing)] == race:
count_similar += 1
else:
count_different += 1
else:
pass
if (count_similar+count_different) == 0:
return False
else:
return float(count_similar)/(count_similar+count_different) < self.similarity_threshhold
try:
similarity.append(float(count_similar)/(count_similar+count_different))
except:
similarity.append(1)
return sum(similarity)/len(similarity)
def get_data_by_county(self):
"""
Return all the data by counties.
"""
df=pd.DataFrame(columns=['County Name','Majority Population (Number)', 'Minority Population (Number)'])
for county,name in zip(self.shape_cali.geometry,self.shape_cali.NAME):
minority_num=0
majority_num=0
for agent in self.agents:
if Point(agent).within(county):
if self.agents[agent]==1:
majority_num+=1
if self.agents[agent]==2:
minority_num+=1
dic={'County Name':[name],'Majority Population (Number)':[majority_num],'Minority Population (Number)':[minority_num]}
df=df.append(pd.DataFrame(dic),ignore_index=True)
df['Total Population']=df['Majority Population (Number)']+df['Minority Population (Number)']
df['Majority Population (%)']=df[['Total Population','Majority Population (Number)']].apply(lambda x:0 if x['Total Population']==0 else x['Majority Population (Number)']/x['Total Population'],axis=1)
df['Minority Population (%)']=df[['Total Population','Minority Population (Number)']].apply(lambda x:0 if x['Total Population']==0 else x['Minority Population (Number)']/x['Total Population'],axis=1)
return df
shapefile='CA.shp'
spacing=0.20
empty_ratio=0.30
similarity_threshhold=0.01
n_iterations=100
ratio=0.535
You can get the shapefile here if you want to try it. So the above implementation is fine but the runtime is very slow. I want to optimize the following methods is_unsatisfied,generate_grid_in_polygon . Is it possible to speed up these functions with numba or parallelization? Or any other suggestions are welcome!