Code
# Get the centroid or representative points of the road segments
road_centroids = np.array([geom.centroid.coords[0] for geom in road_seg.geometry])
# Build the KDTree based on the centroids of the road segments
road_tree = cKDTree(road_centroids)
# Initialize a list to store the matched road geometries
matched_road_geometries = []
# Iterate over each row in parcel
for idx, parcel_row in parcel.iterrows():
# Check if Found_Match is True
if parcel_row['Found_Match'] == True:
match_addr = parcel_row['match_road_address']
# Filter road_seg to get rows where road_addr matches match_road_address
matching_road_segs = road_seg[road_seg['road_addr'] == match_addr]
if not matching_road_segs.empty:
# Calculate distances between the parcel polygon geometry and matching road_seg geometries
distances = matching_road_segs.geometry.apply(lambda geom: parcel_row.geometry.distance(geom))
# Find the index of the nearest road geometry
nearest_index = distances.idxmin()
# Append the nearest road geometry to the list
matched_road_geometries.append(matching_road_segs.loc[nearest_index].geometry)
else:
# If no match is found, append None or an empty geometry
matched_road_geometries.append(None)
else:
# If Found_Match is False or NaN, find the nearest road geometry
# Get the centroid of the current parcel polygon
parcel_centroid = np.array(parcel_row.geometry.centroid.coords[0])
# Query the KDTree for the nearest road segment
_, nearest_index = road_tree.query(parcel_centroid)
# Append the nearest road geometry to the list
matched_road_geometries.append(road_seg.iloc[nearest_index].geometry)
# Add the matched road geometries to parcel
parcel['road_geometry'] = matched_road_geometries
# %%
# Function to explode Polygons into individual boundary line segments
def explode_to_lines(gdf):
# Create a list to store new rows
line_list = []
for index, row in gdf.iterrows():
# Get the exterior boundary of the polygon
exterior = row['geometry'].exterior
# Convert the boundary into LineString segments
lines = [LineString([exterior.coords[i], exterior.coords[i + 1]])
for i in range(len(exterior.coords) - 1)]
# Create new rows for each line segment, retaining the original attributes
for line in lines:
new_row = row.copy()
new_row['geometry'] = line
line_list.append(new_row)
# Use pd.concat to generate the final GeoDataFrame
line_gdf = pd.concat(line_list, axis=1).T
line_gdf = gpd.GeoDataFrame(line_gdf, geometry='geometry', crs=gdf.crs)
return line_gdf
# Call the function to explode the line segments
parcel_seg = explode_to_lines(parcel)
# Reset the index by group
parcel_seg['new_index'] = parcel_seg.groupby('parcel_id').cumcount()
parcel_seg.set_index('new_index', inplace=True)
parcel_seg.index.name = None
# Function to calculate the bearing of a geometry
def fun_bearing_ra(geom):
coords = np.array(geom.coords)
# Use the first and last coordinates to calculate the bearing
x1, y1 = coords[0]
x2, y2 = coords[-1]
# Calculate the bearing using atan2
bearing = math.atan2(y2 - y1, x2 - x1)
return bearing
def calculate_angle_difference(line1, line2):
bearing1 = fun_bearing_ra(line1)
bearing2 = fun_bearing_ra(line2)
# Calculate the absolute angle difference and ensure it is <= 180 degrees
delta_theta = bearing2 - bearing1
# Ensure the angle is between -π and π
if delta_theta > math.pi:
delta_theta -= 2 * math.pi
elif delta_theta < -math.pi:
delta_theta += 2 * math.pi
# Convert the angle to degrees
angle_between_degrees = math.degrees(abs(delta_theta))
# Return the smaller angle difference (angle or its supplement)
return min(angle_between_degrees, 180 - angle_between_degrees)
# Check if two segments share a common point (i.e., their start or end point is the same)
def are_segments_connected(line1, line2):
coords1 = np.array(line1.coords)
coords2 = np.array(line2.coords)
# Check if the start or end points of the segments are the same
if np.all(coords1[0] == coords2[0]) or np.all(coords1[0] == coords2[-1]) or \
np.all(coords1[-1] == coords2[0]) or np.all(coords1[-1] == coords2[-1]):
return True
return False
# Function to reorder segments based on the turning point
def reorder_segments_by_turning_point(segments, turning_point_index):
# Reorder segments starting from the identified turning point
reordered_segments = segments[turning_point_index:] + segments[:turning_point_index]
return reordered_segments
# Main function: Process each parcel_id group and return a new GeoDataFrame
def process_parcel_segments(parcel_seg):
merged_segments = [] # List to store the reordered segments
# Group the parcel segments by parcel_id and process each group
for object_id, group in parcel_seg.groupby('parcel_id'):
segments = group['geometry'].tolist() # Get the list of line segments for the current group
original_indices = group.index.tolist() # Preserve the original indices
turning_points = []
# Loop through all adjacent segments to calculate angle differences
for i in range(1, len(segments)):
if are_segments_connected(segments[i-1], segments[i]):
angle_diff = calculate_angle_difference(segments[i-1], segments[i])
if angle_diff > 15: # If angle difference is greater than 15 degrees, mark it as a turning point
turning_points.append(i)
# If there are turning points, reorder the segments starting from the first turning point
if turning_points:
turning_point_index = turning_points[0]
reordered_segments = reorder_segments_by_turning_point(segments, turning_point_index)
reordered_original_indices = reorder_segments_by_turning_point(original_indices, turning_point_index)
else:
# If no turning points, retain the original order
reordered_segments = segments
reordered_original_indices = original_indices
# Store the reordered segments and their attributes
for j, (line, original_index) in enumerate(zip(reordered_segments, reordered_original_indices)):
row = group.iloc[0].copy() # Copy the first row's attributes
row['geometry'] = line
row['original_index'] = original_index # Preserve the original index
row['new_index'] = j # Assign the new index based on the reordered list
merged_segments.append(row)
# Create a new GeoDataFrame for the reordered segments
updated_gdf = gpd.GeoDataFrame(merged_segments, columns=parcel_seg.columns.tolist() + ['original_index', 'new_index'])
updated_gdf = updated_gdf.reset_index(drop=True)
return updated_gdf
# Run the main function and get the new GeoDataFrame
updated_parcel_seg = process_parcel_segments(parcel_seg)
parcel_seg = updated_parcel_seg
# Group parcel_seg by parcel_id and process each group
merged_segments = []
for object_id, group in parcel_seg.groupby('parcel_id'):
# Get the list of geometries in the current group
segments = group.geometry.tolist()
# Start with the first segment
merged_lines = [segments[0]] # Start with the first segment
for i in range(1, len(segments)):
connected = False
# Always compare the current segment with the previous one
if are_segments_connected(segments[i-1], segments[i]):
# Calculate the angle difference between the current segment and the previous one
angle_diff = calculate_angle_difference(segments[i-1], segments[i])
# If the angle difference is less than 15 degrees, merge the adjacent line segments
if angle_diff < 15:
# Merge the current and previous segments
merged_result = linemerge([merged_lines[-1], segments[i]])
# Check if the result is a MultiLineString, if so, skip the merge
if isinstance(merged_result, LineString):
merged_lines[-1] = merged_result
connected = True
else:
# Skip the merge if it's a MultiLineString
continue
# If no connected segment is found or the angle difference is too large, add the current segment as a new one
if not connected:
merged_lines.append(segments[i])
# Keep the merged results and add other attributes
for line in merged_lines:
row = group.iloc[0].copy() # Copy the first attribute row from the group
row['geometry'] = line
merged_segments.append(row)
# Create a new GeoDataFrame from the merged line segments
parcel_seg = gpd.GeoDataFrame(merged_segments, columns=parcel_seg.columns)
# Check for MultiLineString geometries and explode them into LineString
exploded_segments = []
for index, row in parcel_seg.iterrows():
geom = row['geometry']
if isinstance(geom, MultiLineString):
# Explode the MultiLineString into individual LineStrings
for line in geom:
new_row = row.copy()
new_row['geometry'] = line
exploded_segments.append(new_row)
else:
# Keep the original LineString geometries
exploded_segments.append(row)
# Create a new GeoDataFrame from the exploded segments
parcel_seg = gpd.GeoDataFrame(exploded_segments, columns=parcel_seg.columns)
# extract useful columns
parcel_seg.drop(columns=['original_index', 'new_index'], inplace=True)
# Reset the index of the final GeoDataFrame
parcel_seg = parcel_seg.reset_index(drop=True)
edge_counts = parcel_seg.groupby('parcel_id').size()
parcel_seg['edge_num'] = parcel_seg['parcel_id'].map(edge_counts)
# Function to create tangent lines at both ends of a line segment
def create_tangents(line):
coords = list(line.coords)
if len(coords) < 2:
return None, None # Skip invalid geometries
# Create tangents at the start and end of the line segment
start_tangent = LineString([coords[0], coords[1]])
end_tangent = LineString([coords[-2], coords[-1]])
return start_tangent, end_tangent
# Function to filter curve segments based on angle difference of tangents > 30 degrees
def filter_curve_segments(parcel_seg, angle_threshold=30):
filtered_segments = []
non_filtered_segments = []
for idx, row in parcel_seg.iterrows():
line = row['geometry']
start_tangent, end_tangent = create_tangents(line)
if start_tangent and end_tangent:
angle_diff = calculate_angle_difference(start_tangent, end_tangent)
row_dict = row.to_dict() # Convert the entire row to a dictionary
row_dict['index'] = idx # Preserve the original index
if angle_diff > angle_threshold:
# Add the entire row to the filtered list
filtered_segments.append(row_dict)
else:
# Add the entire row to the non-filtered list
non_filtered_segments.append(row_dict)
# Create DataFrames with the filtered and non-filtered results if data exists
if filtered_segments:
filtered_df = pd.DataFrame(filtered_segments).set_index('index')
filtered_gdf = gpd.GeoDataFrame(filtered_df, crs=parcel_seg.crs, geometry=filtered_df['geometry'])
else:
# Initialize an empty GeoDataFrame with the same structure if no data
filtered_gdf = gpd.GeoDataFrame(columns=parcel_seg.columns, crs=parcel_seg.crs)
if non_filtered_segments:
non_filtered_df = pd.DataFrame(non_filtered_segments).set_index('index')
non_filtered_gdf = gpd.GeoDataFrame(non_filtered_df, crs=parcel_seg.crs, geometry=non_filtered_df['geometry'])
else:
# Initialize an empty GeoDataFrame with the same structure if no data
non_filtered_gdf = gpd.GeoDataFrame(columns=parcel_seg.columns, crs=parcel_seg.crs)
return filtered_gdf, non_filtered_gdf
# Call the function to filter curve segments and create two GeoDataFrames
filtered_parcel_seg, non_filtered_parcel_seg = filter_curve_segments(parcel_seg[parcel_seg['edge_num'] == 3])
# Function to create tangent lines and reverse the line if necessary
def create_tangents_with_reversal(line):
coords = list(line.coords)
if len(coords) < 2:
return None, None # Skip invalid geometries
# Find the points with the smallest and largest y-coordinate (latitude)
if coords[0][1] < coords[-1][1]: # If the first point's y is smaller, it's the start point
start_point = coords[0]
end_point = coords[-1]
else: # Otherwise, the last point is the start point
start_point = coords[-1]
end_point = coords[0]
# Reverse the line if start_point is not the same as coords[0]
if start_point != coords[0]:
coords.reverse() # Reverse the order of coordinates
# Now create tangents based on the (possibly reversed) coordinates
start_tangent = LineString([coords[0], coords[1]]) # Tangent from the first to the second point
end_tangent = LineString([coords[-2], coords[-1]]) # Tangent from the second last to the last point
return start_tangent, end_tangent, LineString(coords) # Return the tangents and the (possibly reversed) line
# Function to calculate the split point based on the 4/5 rule
def calculate_split_point(line, start_tangent, end_tangent, angle_diff, angle_fraction=0.5):
coords = list(line.coords)
# Iterate through the line and find the point where the angle difference is approximately 4/5
for i in range(1, len(coords) - 1):
intermediate_tangent = LineString([coords[i - 1], coords[i]])
current_angle_diff = calculate_angle_difference(start_tangent, intermediate_tangent)
if current_angle_diff >= angle_diff * angle_fraction:
return coords[i] # Return the split point
return coords[-1] # If no point found, return the endpoint
# Function to process each segment in filtered_parcel_seg
def process_filtered_parcel_seg(filtered_parcel_seg, angle_threshold=30, angle_fraction=0.5):
new_data = []
for idx, row in filtered_parcel_seg.iterrows():
line = row['geometry']
# Apply the tangent and reversal function
start_tangent, end_tangent, adjusted_line = create_tangents_with_reversal(line)
if start_tangent and end_tangent:
angle_diff = calculate_angle_difference(start_tangent, end_tangent)
if angle_diff > angle_threshold:
# Calculate the split point based on the angle difference and fraction
split_point = calculate_split_point(adjusted_line, start_tangent, end_tangent, angle_diff, angle_fraction)
# Add split point to row's data
row_dict = row.to_dict()
row_dict['split_point'] = Point(split_point) # Store the split point as geometry
row_dict['index'] = idx # Store the original index
new_data.append(row_dict)
else:
# If no split needed, just keep the original row
row_dict = row.to_dict()
row_dict['split_point'] = None # No split point, store None
row_dict['index'] = idx # Store the original index
new_data.append(row_dict)
# Convert the processed data back into a GeoDataFrame
new_df = pd.DataFrame(new_data).set_index('index') # Use original index
new_gdf = gpd.GeoDataFrame(new_df, crs=parcel_seg.crs, geometry='split_point')
return new_gdf
# Check if filtered_parcel_seg is non-empty before processing
if not filtered_parcel_seg.empty:
# Call the function to process the filtered_parcel_seg
processed_parcel_seg = process_filtered_parcel_seg(filtered_parcel_seg)
else:
# Handle the case where filtered_parcel_seg is empty
processed_parcel_seg = gpd.GeoDataFrame(columns=filtered_parcel_seg.columns, crs=parcel_seg.crs)
# Function to split filtered_parcel_seg using points from processed_parcel_seg
def split_lines_with_points(filtered_parcel_seg, processed_parcel_seg):
split_segments = []
for idx, row in filtered_parcel_seg.iterrows():
line = row['geometry']
split_point_geom = processed_parcel_seg.loc[idx, 'split_point'] # Get the corresponding point geometry from split_point column
if isinstance(split_point_geom, Point):
# Check if the split point is on the line
if line.contains(split_point_geom):
# If the point is on the line, use it directly for splitting
split_lines = split(line, split_point_geom)
else:
# If the point is not on the line, find the closest point on the line
projected_distance = line.project(split_point_geom)
nearest_point = line.interpolate(projected_distance)
split_lines = split(line, nearest_point)
# Handle GeometryCollection by extracting valid LineString geometries
if isinstance(split_lines, GeometryCollection):
split_segments.extend([{
**row.to_dict(), 'geometry': geom
} for geom in split_lines.geoms if isinstance(geom, LineString)])
continue # Skip to the next iteration
# If no valid split point or GeometryCollection, add the original row
split_segments.append(row.to_dict())
# Convert split_segments to a GeoDataFrame and return
split_gdf = gpd.GeoDataFrame(split_segments, crs=parcel_seg.crs, geometry='geometry')
return split_gdf
# Check if both filtered_parcel_seg and processed_parcel_seg are non-empty before processing
if not filtered_parcel_seg.empty and not processed_parcel_seg.empty:
# Call the function to split lines based on points
split_parcel_seg = split_lines_with_points(filtered_parcel_seg, processed_parcel_seg)
else:
# Handle the case where one or both GeoDataFrames are empty
split_parcel_seg = gpd.GeoDataFrame(columns=filtered_parcel_seg.columns, crs=parcel_seg.crs)
# Function to combine split_parcel_seg and non_filtered_parcel_seg, ensuring parcel_id proximity
def combine_parcel_segs(split_parcel_seg, non_filtered_parcel_seg):
# Ensure both datasets contain the 'parcel_id' column
if 'parcel_id' not in split_parcel_seg.columns or 'parcel_id' not in non_filtered_parcel_seg.columns:
raise ValueError("Both datasets must contain the 'parcel_id' column.")
# Convert parcel_id to string to avoid type errors during sorting
split_parcel_seg['parcel_id'] = split_parcel_seg['parcel_id'].astype(str)
non_filtered_parcel_seg['parcel_id'] = non_filtered_parcel_seg['parcel_id'].astype(str)
# Concatenate the two GeoDataFrames and ensure 'crs' and 'geometry' are set
combined_parcel_seg = gpd.GeoDataFrame(
pd.concat([split_parcel_seg, non_filtered_parcel_seg], ignore_index=True),
crs=parcel_seg.crs, # Use the crs from one of the input GeoDataFrames
geometry='geometry' # Ensure the geometry column is correctly set
)
# Sort by 'parcel_id' to ensure similar parcel_id are together
combined_parcel_seg_sorted = combined_parcel_seg.sort_values(by='parcel_id')
return combined_parcel_seg_sorted
# Check if both split_parcel_seg and non_filtered_parcel_seg are non-empty before processing
if not split_parcel_seg.empty and not non_filtered_parcel_seg.empty:
# Call the function to combine the datasets
reconstr_seg = combine_parcel_segs(split_parcel_seg, non_filtered_parcel_seg)
else:
# Handle the case where one or both GeoDataFrames are empty
reconstr_seg = gpd.GeoDataFrame(columns=split_parcel_seg.columns, crs=parcel_seg.crs)
# Check if reconstr_seg is non-empty before concatenating
if not reconstr_seg.empty:
parcel_seg = pd.concat([parcel_seg[parcel_seg['edge_num'] != 3], reconstr_seg], ignore_index=True).reset_index(drop=True)
parcel_seg = parcel_seg.drop(columns=['edge_num'])
parcel_seg = parcel_seg.set_crs(parcel.crs, allow_override=True)
# %%
def normalize_linestring(line):
# Ensure the coordinates are in a consistent direction (smallest point first)
if isinstance(line, LineString):
coords = list(line.coords)
if coords[0] > coords[-1]:
coords.reverse() # Reverse the order of coordinates to normalize the direction
return LineString(coords)
else:
return line # If it's not a LineString, keep it as is
def check_shared_sides_normalized(parcel_seg, threshold=0.1, distance_threshold=100):
"""
Check for shared sides in parcel_seg using cKDTree for faster neighbor searches.
Parameters:
- parcel_seg: GeoDataFrame containing parcel segments.
- threshold: float, minimum proportion of line length overlap to consider as a shared side.
- distance_threshold: float, maximum distance between line segment midpoints to be considered for comparison.
Returns:
- parcel_seg: GeoDataFrame with 'shared_side' column indicating whether a side is shared.
"""
# Normalize all the geometry objects
parcel_seg['normalized_geom'] = parcel_seg['geometry'].apply(normalize_linestring)
# Extract the midpoints of each line segment to build the KDTree
midpoints = np.array([line.interpolate(0.5, normalized=True).coords[0] for line in parcel_seg['normalized_geom']])
# Build cKDTree with midpoints
kdtree = cKDTree(midpoints)
# Initialize the 'shared_side' column as False
parcel_seg['shared_side'] = False
# Loop over each line and find nearby lines using KDTree
for i, line1 in parcel_seg.iterrows():
# Query the KDTree for neighbors within the distance_threshold
indices = kdtree.query_ball_point(midpoints[i], r=distance_threshold)
for j in indices:
if i != j: # Avoid comparing the line with itself
line2 = parcel_seg.iloc[j]
intersection = line1['normalized_geom'].intersection(line2['normalized_geom'])
if not intersection.is_empty:
# Calculate the proportion of overlap relative to the length of line1
overlap_ratio = intersection.length / line1['normalized_geom'].length
if overlap_ratio > threshold:
# If the overlap is greater than the threshold, mark as shared side
parcel_seg.at[i, 'shared_side'] = True
parcel_seg.at[j, 'shared_side'] = True
# Remove the temporarily generated 'normalized_geom' column
parcel_seg = parcel_seg.drop(columns=['normalized_geom'])
return parcel_seg
parcel_seg = check_shared_sides_normalized(parcel_seg)