Vector Data
RasterFrames provides a variety of ways to work with spatial vector data (points, lines, and polygons) alongside raster data.
- DataSource for GeoJSON format
- Ability to convert between from GeoPandas and Spark DataFrames
- In PySpark, geometries are Shapely objects, providing a great deal of interoperability
- Many Spark functions for working with columns of geometries
- Vector data is also the basis for zonal map algebra operations.
GeoJSON DataSource
from pyspark import SparkFiles
admin1_us_url = 'https://raw.githubusercontent.com/datasets/geo-admin1-us/master/data/admin1-us.geojson'
spark.sparkContext.addFile(admin1_us_url) # this lets us read http scheme uri's in spark
df = spark.read.geojson(SparkFiles.get('admin1-us.geojson'))
df.printSchema()
root
|-- geometry: geometry (nullable = true)
|-- ISO3166-1-Alpha-3: string (nullable = true)
|-- country: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- state_code: string (nullable = true)
The properties of each discrete geometry are available as columns of the DataFrame, along with the geometry itself.
GeoPandas and RasterFrames
You can also convert a GeoPandas GeoDataFrame to a Spark DataFrame, preserving the geometry column. This means that any vector format that can be read with OGR can be converted to a Spark DataFrame. In the example below, we expect the same schema as the DataFrame defined above by the GeoJSON reader. Note that in a GeoPandas DataFrame there can be heterogeneous geometry types in the column, which may fail Spark’s schema inference.
import geopandas
from shapely.geometry import MultiPolygon
def poly_or_mp_to_mp(g):
""" Normalize polygons or multipolygons to all be multipolygons. """
if isinstance(g, MultiPolygon):
return g
else:
return MultiPolygon([g])
gdf = geopandas.read_file(admin1_us_url)
gdf.geometry = gdf.geometry.apply(poly_or_mp_to_mp)
df2 = spark.createDataFrame(gdf)
df2.printSchema()
root
|-- name: string (nullable = true)
|-- country: string (nullable = true)
|-- ISO3166-1-Alpha-3: string (nullable = true)
|-- state_code: string (nullable = true)
|-- id: string (nullable = true)
|-- geometry: multipolygon (nullable = true)
Shapely Geometry Support
The geometry
column will have a Spark user-defined type that is compatible with Shapely when working with Python via PySpark. This means that when the data is collected to the driver, it will be a Shapely geometry object.
the_first = df.first()
print(type(the_first['geometry']))
<class 'shapely.geometry.polygon.Polygon'>
Since it is a geometry we can do things like this:
the_first['geometry'].wkt
'POLYGON ((-71.14789974636884 41.64758738867177, -71.1203820461734 41.49465098730397, -71.85382564969197 41.32003632258973, -71.79295081245215 41.46661652278563, -71.8009089830251 42.01324982356905, -71.37915178087496 42.02436025651181, -71.30507361518457 41.76241242122431, -71.14789974636884 41.64758738867177))'
You can also write user-defined functions that take geometries as input, output, or both, via user defined types in the geomesa_pyspark.types module. Here is a simple but inefficient example of a user-defined function that uses both a geometry input and output to compute the centroid of a geometry. Observe in a sample of the data the geometry columns print as well known text (wkt).
from pyspark.sql.functions import udf
from geomesa_pyspark.types import PointUDT
@udf(PointUDT())
def inefficient_centroid(g):
return g.centroid
df.select(df.state_code, inefficient_centroid(df.geometry))
Showing only top 5 rows.
state_code | inefficient_centroid(geometry) |
---|---|
RI | POINT (-71.52928731203043 41.68199156750… |
FL | POINT (-82.50257410032164 28.61692830512… |
OK | POINT (-97.5041982257254 35.581496016431… |
MN | POINT (-94.17743611908642 46.36007316339… |
TX | POINT (-99.3286756609654 31.455508256185… |
GeoMesa Functions and Spatial Relations
As documented in the function reference, various user-defined functions implemented by GeoMesa are also available for use. The example below uses a GeoMesa user-defined function to compute the centroid of a geometry. It is logically equivalent to the example above, but more efficient.
from pyrasterframes.rasterfunctions import st_centroid
df.select(df.state_code, inefficient_centroid(df.geometry), st_centroid(df.geometry))
Showing only top 5 rows.
state_code | inefficient_centroid(geometry) | st_centroid(geometry) |
---|---|---|
RI | POINT (-71.52928731203043 41.68199156750… | POINT (-71.52928731203043 41.68199156750… |
FL | POINT (-82.50257410032164 28.61692830512… | POINT (-82.50257410032164 28.61692830512… |
OK | POINT (-97.5041982257254 35.581496016431… | POINT (-97.5041982257254 35.581496016431… |
MN | POINT (-94.17743611908642 46.36007316339… | POINT (-94.17743611908642 46.36007316339… |
TX | POINT (-99.3286756609654 31.455508256185… | POINT (-99.3286756609654 31.455508256185… |
The RasterFrames vector functions and GeoMesa functions also provide a variety of spatial relations that are useful in combination with the geometric properties of projected rasters. In this example, we use the built-in Landsat catalog which provides an extent. We will convert the extent to a polygon and filter to those within approximately 50 km of a selected point.
from pyrasterframes.rasterfunctions import st_geometry, st_bufferPoint, st_intersects, st_point
from pyspark.sql.functions import lit
l8 = spark.read.format('aws-pds-l8-catalog').load()
l8 = l8.withColumn('geom', st_geometry(l8.bounds_wgs84)) # extent to polygon
l8 = l8.withColumn('paducah', st_point(lit(-88.628), lit(37.072))) # col of points
l8_filtered = l8 \
.filter(st_intersects(l8.geom, st_bufferPoint(l8.paducah, lit(50000.0)))) \
.filter(l8.acquisition_date > '2018-02-01') \
.filter(l8.acquisition_date < '2018-03-11')