Vector Data

RasterFrames provides a variety of ways to work with spatial vector data (points, lines, and polygons) alongside raster data.

  • DataSource for GeoJSON format
  • Ability to convert between from GeoPandas and Spark DataFrames
  • In PySpark, geometries are Shapely objects, providing a great deal of interoperability
  • Many Spark functions for working with columns of geometries
  • Vector data is also the basis for zonal map algebra operations.

GeoJSON DataSource

from pyspark import SparkFiles
admin1_us_url = 'https://raw.githubusercontent.com/datasets/geo-admin1-us/master/data/admin1-us.geojson'
spark.sparkContext.addFile(admin1_us_url)  # this lets us read http scheme uri's in spark

df = spark.read.geojson(SparkFiles.get('admin1-us.geojson'))
df.printSchema()
root
 |-- geometry: geometry (nullable = true)
 |-- ISO3166-1-Alpha-3: string (nullable = true)
 |-- country: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- state_code: string (nullable = true)

The properties of each discrete geometry are available as columns of the DataFrame, along with the geometry itself.

GeoPandas and RasterFrames

You can also convert a GeoPandas GeoDataFrame to a Spark DataFrame, preserving the geometry column. This means that any vector format that can be read with OGR can be converted to a Spark DataFrame. In the example below, we expect the same schema as the DataFrame defined above by the GeoJSON reader. Note that in a GeoPandas DataFrame there can be heterogeneous geometry types in the column, which may fail Spark’s schema inference.

import geopandas
from shapely.geometry import MultiPolygon

def poly_or_mp_to_mp(g):
    """ Normalize polygons or multipolygons to all be multipolygons. """
    if isinstance(g, MultiPolygon):
        return g
    else:
        return MultiPolygon([g])

gdf = geopandas.read_file(admin1_us_url)
gdf.geometry = gdf.geometry.apply(poly_or_mp_to_mp)
df2 = spark.createDataFrame(gdf)
df2.printSchema()
root
 |-- name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- ISO3166-1-Alpha-3: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- id: string (nullable = true)
 |-- geometry: multipolygon (nullable = true)

Shapely Geometry Support

The geometry column will have a Spark user-defined type that is compatible with Shapely when working with Python via PySpark. This means that when the data is collected to the driver, it will be a Shapely geometry object.

the_first = df.first()
print(type(the_first['geometry']))
<class 'shapely.geometry.polygon.Polygon'>

Since it is a geometry we can do things like this:

the_first['geometry'].wkt
'POLYGON ((-71.14789974636884 41.64758738867177, -71.1203820461734 41.49465098730397, -71.85382564969197 41.32003632258973, -71.79295081245215 41.46661652278563, -71.8009089830251 42.01324982356905, -71.37915178087496 42.02436025651181, -71.30507361518457 41.76241242122431, -71.14789974636884 41.64758738867177))'

You can also write user-defined functions that take geometries as input, output, or both, via user defined types in the geomesa_pyspark.types module. Here is a simple but inefficient example of a user-defined function that uses both a geometry input and output to compute the centroid of a geometry. Observe in a sample of the data the geometry columns print as well known text (wkt).

from pyspark.sql.functions import udf
from geomesa_pyspark.types import PointUDT

@udf(PointUDT())
def inefficient_centroid(g):
    return g.centroid

df.select(df.state_code, inefficient_centroid(df.geometry))

Showing only top 5 rows.

state_code inefficient_centroid(geometry)
RI POINT (-71.52928731203043 41.68199156750…
FL POINT (-82.50257410032164 28.61692830512…
OK POINT (-97.5041982257254 35.581496016431…
MN POINT (-94.17743611908642 46.36007316339…
TX POINT (-99.3286756609654 31.455508256185…

GeoMesa Functions and Spatial Relations

As documented in the function reference, various user-defined functions implemented by GeoMesa are also available for use. The example below uses a GeoMesa user-defined function to compute the centroid of a geometry. It is logically equivalent to the example above, but more efficient.

from pyrasterframes.rasterfunctions import st_centroid
df.select(df.state_code, inefficient_centroid(df.geometry), st_centroid(df.geometry))

Showing only top 5 rows.

state_code inefficient_centroid(geometry) st_centroid(geometry)
RI POINT (-71.52928731203043 41.68199156750… POINT (-71.52928731203043 41.68199156750…
FL POINT (-82.50257410032164 28.61692830512… POINT (-82.50257410032164 28.61692830512…
OK POINT (-97.5041982257254 35.581496016431… POINT (-97.5041982257254 35.581496016431…
MN POINT (-94.17743611908642 46.36007316339… POINT (-94.17743611908642 46.36007316339…
TX POINT (-99.3286756609654 31.455508256185… POINT (-99.3286756609654 31.455508256185…

The RasterFrames vector functions and GeoMesa functions also provide a variety of spatial relations that are useful in combination with the geometric properties of projected rasters. In this example, we use the built-in Landsat catalog which provides an extent. We will convert the extent to a polygon and filter to those within approximately 50 km of a selected point.

from pyrasterframes.rasterfunctions import st_geometry, st_bufferPoint, st_intersects, st_point
from pyspark.sql.functions import lit
l8 = spark.read.format('aws-pds-l8-catalog').load()

l8 = l8.withColumn('geom', st_geometry(l8.bounds_wgs84))  # extent to polygon
l8 = l8.withColumn('paducah', st_point(lit(-88.628), lit(37.072)))  # col of points

l8_filtered = l8 \
                .filter(st_intersects(l8.geom, st_bufferPoint(l8.paducah, lit(50000.0)))) \
                .filter(l8.acquisition_date > '2018-02-01') \
                .filter(l8.acquisition_date < '2018-03-11')