Commit 1f8e7e4a authored by Joseph Siddons's avatar Joseph Siddons
Browse files

feat: quadtree and octtree

parent 74c8fe47
"""
Functions for computing navigational information. Can be used to add
navigational information to DataFrames.
"""
from math import acos, asin, atan2, cos, sin, degrees, radians, sqrt
def gcd_slc(
lon0: float,
lat0: float,
lon1: float,
lat1: float,
) -> float:
"""
Compute great circle distance on earth surface between two locations.
Parameters
----------
lon0 : float
Longitude of position 0
lat0 : float
Latitude of position 0
lon1 : float
Longitude of position 1
lat1 : float
Latitude of position 1
Returns
-------
dist : float
Great circle distance between position 0 and position 1.
"""
if abs(lat0 - lat1) <= 1e-6 and abs(lon0 - lon1) <= 1e-6:
return 0
r_earth = 6371
# Convert to radians
lat0, lat1, lon0, lon1 = map(radians, [lat0, lat1, lon0, lon1])
return r_earth * acos(
sin(lat0) * sin(lat1) + cos(lat0) * cos(lat1) * cos(lon1 - lon0)
)
def haversine(
lon0: float,
lat0: float,
lon1: float,
lat1: float,
) -> float:
"""
Compute Haversine distance between two points.
Parameters
----------
lon0 : float
Longitude of position 0
lat0 : float
Latitude of position 0
lon1 : float
Longitude of position 1
lat1 : float
Latitude of position 1
Returns
-------
dist : float
Haversine distance between position 0 and position 1.
"""
lat0, lat1, dlon, dlat = map(
radians, [lat0, lat1, lon1 - lon0, lat1 - lat0]
)
if abs(dlon) < 1e-6 and abs(dlat) < 1e-6:
return 0
r_earth = 6371
a = sin(dlat / 2) ** 2 + cos(lat0) * cos(lat1) * sin(dlon / 2) ** 2
c = 2 * asin(sqrt(a))
return c * r_earth
def bearing(
lon0: float,
lat0: float,
lon1: float,
lat1: float,
) -> float:
"""
Compute the bearing of a track from (lon0, lat0) to (lon1, lat1).
Duplicated from geo-py
Parameters
----------
lon0 : float,
Longitude of start point
lat0 : float,
Latitude of start point
lon1 : float,
Longitude of target point
lat1 : float,
Latitude of target point
Returns
-------
bearing : float
The bearing from point (lon0, lat0) to point (lon1, lat1) in degrees.
"""
lon0, lat0, lon1, lat1 = map(radians, [lon0, lat0, lon1, lat1])
dlon = lon1 - lon0
numerator = sin(dlon) * cos(lat1)
denominator = cos(lat0) * sin(lat1) - (sin(lat0) * cos(lat1) * cos(dlon))
theta = atan2(numerator, denominator)
theta_deg = (degrees(theta) + 360) % 360
return theta_deg
def destination(
lon: float, lat: float, bearing: float, distance: float
) -> tuple[float, float]:
"""
Compute destination of a great circle path.
Compute the destination of a track started from 'lon', 'lat', with
'bearing'. Distance is in units of km.
Duplicated from geo-py
Parameters
----------
lon : float
Longitude of initial position
lat : float
Latitude of initial position
bearing : float
Direction of track
distance : float
Distance to travel
Returns
-------
destination : tuple[float, float]
Longitude and Latitude of final position
"""
lon, lat = radians(lon), radians(lat)
radians_bearing = radians(bearing)
r_earth = 6371
delta = distance / r_earth
lat2 = asin(
sin(lat) * cos(delta) + cos(lat) * sin(delta) * cos(radians_bearing)
)
numerator = sin(radians_bearing) * sin(delta) * cos(lat)
denominator = cos(delta) - sin(lat) * sin(lat2)
lon2 = lon + atan2(numerator, denominator)
lon2_deg = (degrees(lon2) + 540) % 360 - 180
lat2_deg = degrees(lat2)
return lon2_deg, lat2_deg
def midpoint(
lon0: float,
lat0: float,
lon1: float,
lat1: float,
) -> tuple[float, float]:
"""
Compute the midpoint of a great circle track
Parameters
----------
lon0 : float
Longitude of position 0
lat0 : float
Latitude of position 0
lon1 : float
Longitude of position 1
lat1 : float
Latitude of position 1
Returns
-------
lon, lat
Positions of midpoint between position 0 and position 1
"""
bear = bearing(lon0, lat0, lon1, lat1)
dist = haversine(lon0, lat0, lon1, lat1)
return destination(lon0, lat0, bear, dist / 2)
from datetime import datetime, timedelta
from .distance_metrics import haversine
class SpaceTimeRecord:
"""
ICOADS Record class.
This is a simple instance of an ICOARDS record, it requires position and
temporal data. It can optionally include a UID and extra data.
The temporal component was designed to use `datetime` values, however all
methods will work with numeric datetime information - for example a pentad,
timestamp, julian day, etc. Note that any uses within an OctTree and
SpaceTimeRectangle must also have timedelta values replaced with numeric
ranges in this case.
Equality is checked only on the required fields + UID if it is specified.
Parameters
----------
lon : float
Horizontal coordinate (longitude).
lat : float
Vertical coordinate (latitude).
datetime : datetime
Datetime of the record. Can also be a numeric value such as pentad.
Comparisons between Records with datetime and Records with numeric
datetime will fail.
uid : str | None
Unique Identifier.
**data
Additional data passed to the SpaceTimeRecord for use by other functions
or classes.
"""
def __init__(
self,
lon: float,
lat: float,
datetime: datetime,
uid: str | None = None,
**data,
) -> None:
self.lon = lon
self.lat = lat
self.datetime = datetime
self.uid = uid
for var, val in data.items():
setattr(self, var, val)
return None
def __str__(self) -> str:
return f"Record(x = {self.lon}, y = {self.lat}, datetime = {self.datetime}, uid = {self.uid})"
def __eq__(self, other: object) -> bool:
return (
isinstance(other, SpaceTimeRecord)
and self.lon == other.lon
and self.lat == other.lat
and self.datetime == other.datetime
and (not (self.uid or other.uid) or self.uid == other.uid)
)
class SpaceTimeRecords(list[SpaceTimeRecord]):
"""List of SpaceTimeRecords"""
class SpaceTimeRectangle:
"""
A simple Space Time SpaceTimeRectangle class.
This constructs a simple Rectangle object.
The defining coordinates are the centres of the box, and the extents
are the full width, height, and time extent.
Whilst the rectangle is assumed to lie on the surface of Earth, this is
a projection as the rectangle is defined by a longitude/latitude range.
The temporal components are defined in the same way as the spatial
components, that is that the `datetime` component (t) is the "centre", and
the time extent (dt) is the full time range of the box.
Parameters
----------
lon : float
Horizontal centre of the rectangle (longitude).
lat : float
Vertical centre of the rectangle (latitude).
datetime : datetime
Datetime centre of the rectangle.
w : float
Width of the rectangle (longitude range).
h : float
Height of the rectangle (latitude range).
dt : timedelta
Time extent of the rectangle.
"""
def __init__(
self,
lon: float,
lat: float,
datetime: datetime,
lon_range: float,
lat_range: float,
dt: timedelta,
) -> None:
self.lon = lon
self.lat = lat
self.lon_range = lon_range
self.lat_range = lat_range
self.datetime = datetime
self.dt = dt
def __str__(self) -> str:
return f"SpaceTimeRectangle(x = {self.lon}, y = {self.lat}, w = {self.lon_range}, h = {self.lat_range}, t = {self.datetime}, dt = {self.dt})"
def __eq__(self, other: object) -> bool:
return (
isinstance(other, SpaceTimeRectangle)
and self.lon == other.lon
and self.lat == other.lat
and self.lon_range == other.lon_range
and self.lat_range == other.lat_range
and self.datetime == other.datetime
and self.dt == other.dt
)
def contains(self, point: SpaceTimeRecord) -> bool:
"""Test if a point is contained within the SpaceTimeRectangle"""
return (
point.lon <= self.lon + self.lon_range / 2
and point.lon >= self.lon - self.lon_range / 2
and point.lat <= self.lat + self.lat_range / 2
and point.lat >= self.lat - self.lat_range / 2
and point.datetime <= self.datetime + self.dt / 2
and point.datetime >= self.datetime - self.dt / 2
)
def intersects(self, other: object) -> bool:
"""Test if another Rectangle object intersects this Rectangle"""
return isinstance(other, SpaceTimeRectangle) and not (
self.lon - self.lon_range / 2 > other.lon + other.lon_range / 2
or self.lon + self.lon_range / 2 < other.lon - other.lon_range / 2
or self.lat - self.lat_range / 2 > other.lat + other.lat_range / 2
or self.lat + self.lat_range / 2 < other.lat - other.lat_range / 2
or self.datetime - self.dt / 2 > other.datetime + other.dt / 2
or self.datetime + self.dt / 2 < other.datetime - other.dt / 2
)
def nearby(
self,
point: SpaceTimeRecord,
dist: float,
t_dist: timedelta,
) -> bool:
"""
Check if point is nearby the Rectangle
Determines if a SpaceTimeRecord that falls on the surface of Earth is
nearby to the rectangle in space and time. This calculation uses the
Haversine distance metric.
Distance from rectangle to point is challenging on the surface of a
sphere, this calculation will return false positives as a check based
on the distance from the centre of the rectangle to the corners, or
to its Eastern edge (if the rectangle crosses the equator) is used in
combination with the input distance.
The primary use-case of this method is for querying an OctTree for
nearby Records.
Parameters
----------
point : SpaceTimeRecord
dist : float,
t_dist : timedelta
Returns
-------
bool : True if the point is <= dist + max(dist(centre, corners))
"""
# QUESTION: Is this sufficient? Possibly it is overkill
corner_dist = max(
haversine(
self.lon,
self.lat,
self.lon + self.lon_range / 2,
self.lat + self.lat_range / 2,
),
haversine(
self.lon,
self.lat,
self.lon + self.lon_range / 2,
self.lat - self.lat_range / 2,
),
)
if (self.lat + self.lat_range / 2) * (
self.lat - self.lat_range / 2
) < 0:
corner_dist = max(
corner_dist,
haversine(self.lon, self.lat, self.lon + self.lon_range / 2, 0),
)
return (
haversine(self.lon, self.lat, point.lon, point.lat)
<= dist + corner_dist
and point.datetime - t_dist <= self.datetime + self.dt / 2
and point.datetime + t_dist >= self.datetime - self.dt / 2
)
class OctTree:
"""
A Simple OctTree class for PyCOADS.
Acts as a space-time OctTree on the surface of Earth, allowing for querying
nearby points faster than searching a full DataFrame. As SpaceTimeRecords
are added to the OctTree, the OctTree divides into 8 children as the
capacity is reached. Additional SpaceTimeRecords are then added to the
children where they fall within the child OctTree's boundary.
SpaceTimeRecords already part of the OctTree before divided are not
distributed to the children OctTrees.
Whilst the OctTree has a temporal component, and was designed to utilise
datetime / timedelta objects, numeric values and ranges can be used. This
usage must be consistent for the boundary and all SpaceTimeRecords that
are part of the OctTree. This allows for usage of pentad, timestamp,
Julian day, etc. as datetime values.
Parameters
----------
boundary : SpaceTimeRectangle
The bounding SpaceTimeRectangle of the QuadTree
capacity : int
The capacity of each cell, if max_depth is set then a cell at the
maximum depth may contain more points than the capacity.
depth : int
The current depth of the cell. Initialises to zero if unset.
max_depth : int | None
The maximum depth of the QuadTree. If set, this can override the
capacity for cells at the maximum depth.
"""
def __init__(
self,
boundary: SpaceTimeRectangle,
capacity: int = 5,
depth: int = 0,
max_depth: int | None = None,
) -> None:
self.boundary = boundary
self.capacity = capacity
self.depth = depth
self.max_depth = max_depth
self.points = SpaceTimeRecords()
self.divided: bool = False
return None
def __str__(self) -> str:
indent = " " * self.depth
out = f"{indent}OctTree:\n"
out += f"{indent}- boundary: {self.boundary}\n"
out += f"{indent}- capacity: {self.capacity}\n"
out += f"{indent}- depth: {self.depth}\n"
if self.max_depth:
out += f"{indent}- max_depth: {self.max_depth}\n"
if self.points:
out += f"{indent}- contents:\n"
out += f"{indent}- number of elements: {len(self.points)}\n"
for p in self.points:
out += f"{indent} * {p}\n"
if self.divided:
out += f"{indent}- with children:\n"
out += f"{self.northwestback}"
out += f"{self.northeastback}"
out += f"{self.southwestback}"
out += f"{self.southeastback}"
out += f"{self.northwestfwd}"
out += f"{self.northeastfwd}"
out += f"{self.southwestfwd}"
out += f"{self.southeastfwd}"
return out
def divide(self):
"""Divide the QuadTree"""
self.northwestfwd = OctTree(
SpaceTimeRectangle(
self.boundary.lon - self.boundary.lon_range / 4,
self.boundary.lat + self.boundary.lat_range / 4,
self.boundary.datetime + self.boundary.dt / 4,
self.boundary.lon_range / 2,
self.boundary.lat_range / 2,
self.boundary.dt / 2,
),
capacity=self.capacity,
depth=self.depth + 1,
max_depth=self.max_depth,
)
self.northeastfwd = OctTree(
SpaceTimeRectangle(
self.boundary.lon + self.boundary.lon_range / 4,
self.boundary.lat + self.boundary.lat_range / 4,
self.boundary.datetime + self.boundary.dt / 4,
self.boundary.lon_range / 2,
self.boundary.lat_range / 2,
self.boundary.dt / 2,
),
capacity=self.capacity,
depth=self.depth + 1,
max_depth=self.max_depth,
)
self.southwestfwd = OctTree(
SpaceTimeRectangle(
self.boundary.lon - self.boundary.lon_range / 4,
self.boundary.lat - self.boundary.lat_range / 4,
self.boundary.datetime + self.boundary.dt / 4,
self.boundary.lon_range / 2,
self.boundary.lat_range / 2,
self.boundary.dt / 2,
),
capacity=self.capacity,
depth=self.depth + 1,
max_depth=self.max_depth,
)
self.southeastfwd = OctTree(
SpaceTimeRectangle(
self.boundary.lon + self.boundary.lon_range / 4,
self.boundary.lat - self.boundary.lat_range / 4,
self.boundary.datetime + self.boundary.dt / 4,
self.boundary.lon_range / 2,
self.boundary.lat_range / 2,
self.boundary.dt / 2,
),
capacity=self.capacity,
depth=self.depth + 1,
max_depth=self.max_depth,
)
self.northwestback = OctTree(
SpaceTimeRectangle(
self.boundary.lon - self.boundary.lon_range / 4,
self.boundary.lat + self.boundary.lat_range / 4,
self.boundary.datetime - self.boundary.dt / 4,
self.boundary.lon_range / 2,
self.boundary.lat_range / 2,
self.boundary.dt / 2,
),
capacity=self.capacity,
depth=self.depth + 1,
max_depth=self.max_depth,
)
self.northeastback = OctTree(
SpaceTimeRectangle(
self.boundary.lon + self.boundary.lon_range / 4,
self.boundary.lat + self.boundary.lat_range / 4,
self.boundary.datetime - self.boundary.dt / 4,
self.boundary.lon_range / 2,
self.boundary.lat_range / 2,
self.boundary.dt / 2,
),
capacity=self.capacity,
depth=self.depth + 1,
max_depth=self.max_depth,
)
self.southwestback = OctTree(
SpaceTimeRectangle(
self.boundary.lon - self.boundary.lon_range / 4,
self.boundary.lat - self.boundary.lat_range / 4,
self.boundary.datetime - self.boundary.dt / 4,
self.boundary.lon_range / 2,
self.boundary.lat_range / 2,
self.boundary.dt / 2,
),
capacity=self.capacity,
depth=self.depth + 1,
max_depth=self.max_depth,
)
self.southeastback = OctTree(
SpaceTimeRectangle(
self.boundary.lon + self.boundary.lon_range / 4,
self.boundary.lat - self.boundary.lat_range / 4,
self.boundary.datetime - self.boundary.dt / 4,
self.boundary.lon_range / 2,
self.boundary.lat_range / 2,
self.boundary.dt / 2,
),
capacity=self.capacity,
depth=self.depth + 1,
max_depth=self.max_depth,
)
self.divided = True
def _datetime_is_numeric(self) -> bool:
return not isinstance(self.boundary.datetime, datetime)
def insert(self, point: SpaceTimeRecord) -> bool:
"""
Insert a SpaceTimeRecord into the QuadTree.
Note that the SpaceTimeRecord can have numeric datetime values if that
is consistent with the OctTree.
"""
if not self.boundary.contains(point):
return False
elif self.max_depth and self.depth == self.max_depth:
self.points.append(point)
return True
elif len(self.points) < self.capacity:
self.points.append(point)
return True
else:
if not self.divided:
self.divide()
if self.northwestback.insert(point):
return True
elif self.northeastback.insert(point):
return True
elif self.southwestback.insert(point):
return True
elif self.southeastback.insert(point):
return True
elif self.northwestfwd.insert(point):
return True
elif self.northeastfwd.insert(point):
return True
elif self.southwestfwd.insert(point):
return True
elif self.southeastfwd.insert(point):
return True
return False
def query(
self,
rect: SpaceTimeRectangle,
points: SpaceTimeRecords | None = None,
) -> SpaceTimeRecords:
"""Get points that fall in a SpaceTimeRectangle"""
if not points:
points = SpaceTimeRecords()
if not self.boundary.intersects(rect):
return points
for point in self.points:
if rect.contains(point):
points.append(point)
if self.divided:
points = self.northwestfwd.query(rect, points)
points = self.northeastfwd.query(rect, points)
points = self.southwestfwd.query(rect, points)
points = self.southeastfwd.query(rect, points)
points = self.northwestback.query(rect, points)
points = self.northeastback.query(rect, points)
points = self.southwestback.query(rect, points)
points = self.southeastback.query(rect, points)
return points
def nearby_points(
self,
point: SpaceTimeRecord,
dist: float,
t_dist: timedelta,
points: SpaceTimeRecords | None = None,
) -> SpaceTimeRecords:
"""
Get all points that are nearby another point.
Query the OctTree to find all SpaceTimeRecords within the OctTree that
are nearby to the query SpaceTimeRecord. This search should be faster
than searching through all records, since only OctTree children whose
boundaries are close to the query SpaceTimeRecord are evaluated.
Parameters
----------
point : SpaceTimeRecord
The query point.
dist : float
The distance for comparison. Note that Haversine distance is used
as the distance metric as the query SpaceTimeRecord and OctTree are
assumed to lie on the surface of Earth.
t_dist : timedelta
Max time gap between SpaceTimeRecords within the OctTree and the
query SpaceTimeRecord. Can be numeric if the OctTree boundaries,
SpaceTimeRecords, and query SpaceTimeRecord have numeric datetime
values and ranges.
points : SpaceTimeRecords | None
List of SpaceTimeRecords already found. Most use cases will be to
not set this value, since it's main use is for passing onto the
children OctTrees.
Returns
-------
SpaceTimeRecords : A list of SpaceTimeRecords whose distance to the
query SpaceTimeRecord is <= dist, and the datetimes of the
SpaceTimeRecords fall within the datetime range of the query
SpaceTimeRecord.
"""
if not points:
points = SpaceTimeRecords()
if not self.boundary.nearby(point, dist, t_dist):
return points
for test_point in self.points:
if (
haversine(point.lon, point.lat, test_point.lon, test_point.lat)
<= dist
and test_point.datetime <= point.datetime + t_dist
and test_point.datetime >= point.datetime - t_dist
):
points.append(test_point)
if self.divided:
points = self.northwestback.nearby_points(
point, dist, t_dist, points
)
points = self.northeastback.nearby_points(
point, dist, t_dist, points
)
points = self.southwestback.nearby_points(
point, dist, t_dist, points
)
points = self.southeastback.nearby_points(
point, dist, t_dist, points
)
points = self.northwestfwd.nearby_points(
point, dist, t_dist, points
)
points = self.northeastfwd.nearby_points(
point, dist, t_dist, points
)
points = self.southwestfwd.nearby_points(
point, dist, t_dist, points
)
points = self.southeastfwd.nearby_points(
point, dist, t_dist, points
)
return points
"""
Constuctors for QuadTree classes that can decrease the number of comparisons
for detecting nearby records for example
"""
from datetime import datetime
from .distance_metrics import haversine
class Record:
"""
ICOADS Record class
Parameters
----------
x : float
Horizontal coordinate
y : float
Vertical coordinate
datetime : datetime | None
Datetime of the record
uid : str | None
Unique Identifier
"""
def __init__(
self,
lon: float,
lat: float,
datetime: datetime | None = None,
uid: str | None = None,
**data,
) -> None:
self.lon = lon
self.lat = lat
self.datetime = datetime
self.uid = uid
for var, val in data.items():
setattr(self, var, val)
return None
def __str__(self) -> str:
return f"Record(x = {self.lon}, y = {self.lat}, datetime = {self.datetime}, uid = {self.uid})"
def __eq__(self, other: object) -> bool:
return (
isinstance(other, Record)
and self.lon == other.lon
and self.lat == other.lat
and self.datetime == other.datetime
and (not (self.uid or other.uid) or self.uid == other.uid)
)
class Rectangle:
"""
A simple Rectangle class
Parameters
----------
x : float
Horizontal centre of the rectangle
y : float
Vertical centre of the rectangle
w : float
Width of the rectangle
h : float
Height of the rectangle
"""
def __init__(
self,
lon: float,
lat: float,
lon_range: float,
lat_range: float,
) -> None:
self.lon = lon
self.lat = lat
self.lon_range = lon_range
self.lat_range = lat_range
def __str__(self) -> str:
return f"Rectangle(x = {self.lon}, y = {self.lat}, w = {self.lon_range}, h = {self.lat_range})"
def __eq__(self, other: object) -> bool:
return (
isinstance(other, Rectangle)
and self.lon == other.lon
and self.lat == other.lat
and self.lon_range == other.lon_range
and self.lat_range == other.lat_range
)
def contains(self, point: Record) -> bool:
"""Test if a point is contained within the Rectangle"""
return (
point.lon <= self.lon + self.lon_range / 2
and point.lon >= self.lon - self.lon_range / 2
and point.lat <= self.lat + self.lat_range / 2
and point.lat >= self.lat - self.lat_range / 2
)
def intersects(self, other: object) -> bool:
"""Test if another Rectangle object intersects this Rectangle"""
return isinstance(other, Rectangle) and not (
self.lon - self.lon_range / 2 > other.lon + other.lon_range / 2
or self.lon + self.lon_range / 2 < other.lon - other.lon_range / 2
or self.lat - self.lat_range / 2 > other.lat + other.lat_range / 2
or self.lat + self.lat_range / 2 < other.lat - other.lat_range / 2
)
def nearby(
self,
point: Record,
dist: float,
) -> bool:
"""Check if point is nearby the Rectangle"""
# QUESTION: Is this sufficient? Possibly it is overkill
corner_dist = max(
haversine(
self.lon,
self.lat,
self.lon + self.lon_range / 2,
self.lat + self.lat_range / 2,
),
haversine(
self.lon,
self.lat,
self.lon + self.lon_range / 2,
self.lat - self.lat_range / 2,
),
)
if (self.lat + self.lat_range / 2) * (
self.lat - self.lat_range / 2
) < 0:
corner_dist = max(
corner_dist,
haversine(self.lon, self.lat, self.lon + self.lon_range / 2, 0),
)
return (
haversine(self.lon, self.lat, point.lon, point.lat)
<= dist + corner_dist
)
class QuadTree:
"""
A Simple QuadTree class for PyCOADS
Parameters
----------
boundary : Rectangle
The bounding Rectangle of the QuadTree
capacity : int
The capacity of each cell, if max_depth is set then a cell at the
maximum depth may contain more points than the capacity.
depth : int
The current depth of the cell. Initialises to zero if unset.
max_depth : int | None
The maximum depth of the QuadTree. If set, this can override the
capacity for cells at the maximum depth.
"""
def __init__(
self,
boundary: Rectangle,
capacity: int = 5,
depth: int = 0,
max_depth: int | None = None,
) -> None:
self.boundary = boundary
self.capacity = capacity
self.depth = depth
self.max_depth = max_depth
self.points: list[Record] = list()
self.divided: bool = False
return None
def __str__(self) -> str:
indent = " " * self.depth
out = f"{indent}QuadTree:\n"
out += f"{indent}- boundary: {self.boundary}\n"
out += f"{indent}- capacity: {self.capacity}\n"
out += f"{indent}- depth: {self.depth}\n"
if self.max_depth:
out += f"{indent}- max_depth: {self.max_depth}\n"
out += f"{indent}- contents: {self.points}\n"
if self.divided:
out += f"{indent}- with children:\n"
out += f"{self.northwest}"
out += f"{self.northeast}"
out += f"{self.southwest}"
out += f"{self.southeast}"
return out
def divide(self):
"""Divide the QuadTree"""
self.northwest = QuadTree(
Rectangle(
self.boundary.lon - self.boundary.lon_range / 4,
self.boundary.lat + self.boundary.lat_range / 4,
self.boundary.lon_range / 2,
self.boundary.lat_range / 2,
),
capacity=self.capacity,
depth=self.depth + 1,
max_depth=self.max_depth,
)
self.northeast = QuadTree(
Rectangle(
self.boundary.lon + self.boundary.lon_range / 4,
self.boundary.lat + self.boundary.lat_range / 4,
self.boundary.lon_range / 2,
self.boundary.lat_range / 2,
),
capacity=self.capacity,
depth=self.depth + 1,
max_depth=self.max_depth,
)
self.southwest = QuadTree(
Rectangle(
self.boundary.lon - self.boundary.lon_range / 4,
self.boundary.lat - self.boundary.lat_range / 4,
self.boundary.lon_range / 2,
self.boundary.lat_range / 2,
),
capacity=self.capacity,
depth=self.depth + 1,
max_depth=self.max_depth,
)
self.southeast = QuadTree(
Rectangle(
self.boundary.lon + self.boundary.lon_range / 4,
self.boundary.lat - self.boundary.lat_range / 4,
self.boundary.lon_range / 2,
self.boundary.lat_range / 2,
),
capacity=self.capacity,
depth=self.depth + 1,
max_depth=self.max_depth,
)
self.divided = True
def insert(self, point: Record) -> bool:
"""Insert a point into the QuadTree"""
if not self.boundary.contains(point):
return False
elif self.max_depth and self.depth == self.max_depth:
self.points.append(point)
return True
elif len(self.points) < self.capacity:
self.points.append(point)
return True
else:
if not self.divided:
self.divide()
if self.northwest.insert(point):
return True
elif self.northeast.insert(point):
return True
elif self.southwest.insert(point):
return True
elif self.southeast.insert(point):
return True
return False
def query(
self,
rect: Rectangle,
points: list[Record] | None = None,
) -> list[Record]:
"""Get points that fall in a rectangle"""
if not points:
points = list()
if not self.boundary.intersects(rect):
return points
for point in self.points:
if rect.contains(point):
points.append(point)
if self.divided:
points = self.northwest.query(rect, points)
points = self.northeast.query(rect, points)
points = self.southwest.query(rect, points)
points = self.southeast.query(rect, points)
return points
def nearby_points(
self,
point: Record,
dist: float,
points: list[Record] | None = None,
) -> list[Record]:
"""Get all points that are nearby another point"""
if not points:
points = list()
if not self.boundary.nearby(point, dist):
return points
for test_point in self.points:
if (
haversine(point.lon, point.lat, test_point.lon, test_point.lat)
<= dist
):
points.append(test_point)
if self.divided:
points = self.northwest.nearby_points(point, dist, points)
points = self.northeast.nearby_points(point, dist, points)
points = self.southwest.nearby_points(point, dist, points)
points = self.southeast.nearby_points(point, dist, points)
return points
import unittest
from datetime import datetime, timedelta
from GeoSpatialTools.octtree import (
OctTree,
SpaceTimeRecord as Record,
SpaceTimeRectangle as Rectangle,
)
class TestRect(unittest.TestCase):
def test_contains(self):
d = datetime(2009, 1, 1, 0, 0)
dt = timedelta(days=14)
rect = Rectangle(10, 5, d, 20, 10, dt)
points: list[Record] = [
Record(10, 5, d),
Record(20, 10, d + timedelta(hours=4)),
Record(20, 10, datetime(2010, 4, 12, 13, 15)),
Record(0, 0, d - timedelta(days=6)),
Record(12.8, 2.1, d + timedelta(days=-1)),
Record(-2, -9.2, d),
]
expected = [True, True, False, True, True, False]
res = list(map(rect.contains, points))
assert res == expected
class TestOctTree(unittest.TestCase):
def test_divides(self):
d = datetime(2023, 3, 24, 12, 0)
dt = timedelta(days=1)
d1 = datetime(2023, 3, 24, 6, 0)
d2 = datetime(2023, 3, 24, 18, 0)
dt2 = timedelta(hours=12)
boundary = Rectangle(10, 4, d, 20, 8, dt)
otree = OctTree(boundary)
expected: list[Rectangle] = [
Rectangle(5, 6, d1, 10, 4, dt2),
Rectangle(15, 6, d1, 10, 4, dt2),
Rectangle(5, 2, d1, 10, 4, dt2),
Rectangle(15, 2, d1, 10, 4, dt2),
Rectangle(5, 6, d2, 10, 4, dt2),
Rectangle(15, 6, d2, 10, 4, dt2),
Rectangle(5, 2, d2, 10, 4, dt2),
Rectangle(15, 2, d2, 10, 4, dt2),
]
otree.divide()
res = [
otree.northwestback.boundary,
otree.northeastback.boundary,
otree.southwestback.boundary,
otree.southeastback.boundary,
otree.northwestfwd.boundary,
otree.northeastfwd.boundary,
otree.southwestfwd.boundary,
otree.southeastfwd.boundary,
]
assert res == expected
def test_insert(self):
d = datetime(2023, 3, 24, 12, 0)
dt = timedelta(days=10)
boundary = Rectangle(10, 4, d, 20, 8, dt)
otree = OctTree(boundary, capacity=3)
points: list[Record] = [
Record(10, 4, d, "main"),
Record(12, 1, d + timedelta(hours=3), "main2"),
Record(3, 7, d - timedelta(days=3), "main3"),
Record(13, 2, d + timedelta(hours=17), "southeastfwd"),
Record(3, 6, d - timedelta(days=1), "northwestback"),
Record(10, 4, d, "northwestback"),
Record(18, 2, d + timedelta(days=23), "not added"),
Record(11, 7, d + timedelta(hours=2), "northeastfwd"),
]
for point in points:
otree.insert(point)
assert otree.divided
expected = [
points[:3],
points[4:6],
[],
[],
[],
[],
[points[-1]],
[],
[points[3]],
]
res = [
otree.points,
otree.northwestback.points,
otree.northeastback.points,
otree.southwestback.points,
otree.southeastback.points,
otree.northwestfwd.points,
otree.northeastfwd.points,
otree.southwestfwd.points,
otree.southeastfwd.points,
]
assert res == expected
def test_query(self):
d = datetime(2023, 3, 24, 12, 0)
dt = timedelta(days=10)
boundary = Rectangle(10, 4, d, 20, 8, dt)
otree = OctTree(boundary, capacity=3)
points: list[Record] = [
Record(10, 4, d, "main"),
Record(12, 1, d + timedelta(hours=3), "main2"),
Record(3, 7, d - timedelta(days=3), "main3"),
Record(13, 2, d + timedelta(hours=17), "southeastfwd"),
Record(3, 6, d - timedelta(days=1), "northwestback"),
Record(10, 4, d, "northwestback"),
Record(18, 2, d + timedelta(days=23), "not added"),
Record(11, 7, d + timedelta(hours=2), "northeastfwd"),
]
for point in points:
otree.insert(point)
test_point = Record(11, 6, d + timedelta(hours=4))
expected = [Record(11, 7, d + timedelta(hours=2), "northeastfwd")]
res = otree.nearby_points(
test_point, dist=200, t_dist=timedelta(hours=5)
)
assert res == expected
if __name__ == "__main__":
unittest.main()
import unittest
from GeoSpatialTools.quadtree import QuadTree, Record, Rectangle
class TestRect(unittest.TestCase):
def test_contains(self):
rect = Rectangle(10, 5, 20, 10)
points: list[Record] = [
Record(10, 5),
Record(20, 10),
Record(0, 0),
Record(12.8, 2.1),
Record(-2, -9.2),
]
expected = [True, True, True, True, False]
res = list(map(rect.contains, points))
assert res == expected
def test_intersection(self):
rect = Rectangle(10, 5, 20, 10)
test_rects: list[Rectangle] = [
Rectangle(10, 5, 18, 8),
Rectangle(25, 5, 9, 12),
Rectangle(15, 8, 12, 7),
]
expected = [True, False, True]
res = list(map(rect.intersects, test_rects))
assert res == expected
class TestQuadTree(unittest.TestCase):
def test_divides(self):
boundary = Rectangle(10, 4, 20, 8)
qtree = QuadTree(boundary)
expected: list[Rectangle] = [
Rectangle(5, 6, 10, 4),
Rectangle(15, 6, 10, 4),
Rectangle(5, 2, 10, 4),
Rectangle(15, 2, 10, 4),
]
qtree.divide()
res = [
qtree.northwest.boundary,
qtree.northeast.boundary,
qtree.southwest.boundary,
qtree.southeast.boundary,
]
assert res == expected
def test_insert(self):
boundary = Rectangle(10, 4, 20, 8)
qtree = QuadTree(boundary, capacity=3)
points: list[Record] = [
Record(10, 5),
Record(19, 1),
Record(0, 0),
Record(-2, -9.2),
Record(12.8, 2.1),
]
expected = [
points[:3],
[],
[],
[],
[points[-1]],
]
for point in points:
qtree.insert(point)
assert qtree.divided
res = [
qtree.points,
qtree.northwest.points,
qtree.northeast.points,
qtree.southwest.points,
qtree.southeast.points,
]
assert res == expected
def test_query(self):
boundary = Rectangle(10, 4, 20, 8)
qtree = QuadTree(boundary, capacity=3)
points: list[Record] = [
Record(10, 5),
Record(19, 1),
Record(0, 0),
Record(-2, -9.2),
Record(12.8, 2.1),
]
test_rect = Rectangle(12.5, 2.5, 1, 1)
expected = [Record(12.8, 2.1)]
for point in points:
qtree.insert(point)
res = qtree.query(test_rect)
assert res == expected
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment