Vector Data

RasterFrames provides a variety of ways to work with spatial vector data (points, lines, and polygons) alongside raster data. There is a convenience DataSource for the GeoJSON format, as well as the ability to convert from GeoPandas to Spark. Representation of vector geometries in PySpark is through Shapely, providing a great deal of interoperability. RasterFrames also provides access to Spark functions for working with geometries.

GeoJSON DataSource

from pyspark import SparkFiles
spark.sparkContext.addFile('https://raw.githubusercontent.com/datasets/geo-admin1-us/master/data/admin1-us.geojson')

df = spark.read.geojson(SparkFiles.get('admin1-us.geojson'))
df.printSchema()
root
 |-- geometry: geometry (nullable = true)
 |-- ISO3166-1-Alpha-3: string (nullable = true)
 |-- country: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- state_code: string (nullable = true)

The properties of each discrete geometry are available as columns of the DataFrame, along with the geometry itself.

GeoPandas and RasterFrames

You can also convert a GeoPandas GeoDataFrame to a Spark DataFrame, preserving the geometry column. This means that any vector format that can be read with OGR can be converted to a Spark DataFrame. In the example below, we expect the same schema as the DataFrame defined above by the GeoJSON reader. Note that in a GeoPandas DataFrame there can be heterogeneous geometry types in the column, which may fail Spark’s schema inference.

import geopandas
from shapely.geometry import MultiPolygon

def poly_or_mp_to_mp(g):
    """ Normalize polygons or multipolygons to all be multipolygons. """
    if isinstance(g, MultiPolygon):
        return g
    else:
        return MultiPolygon([g])

gdf = geopandas.read_file('https://raw.githubusercontent.com/datasets/geo-admin1-us/master/data/admin1-us.geojson')
gdf.geometry = gdf.geometry.apply(poly_or_mp_to_mp)
df2 = spark.createDataFrame(gdf)
df2.printSchema()
root
 |-- name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- ISO3166-1-Alpha-3: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- id: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- __geom__: long (nullable = true)
 |    |-- _is_empty: boolean (nullable = true)
 |    |-- _ndim: long (nullable = true)

Shapely Geometry Support

The geometry column will have a Spark user-defined type that is compatible with Shapely when working with Python via PySpark. This means that when the data is collected to the driver, it will be a Shapely geometry object.

the_first = df.first()
print(type(the_first['geometry']))
<class 'shapely.geometry.polygon.Polygon'>

Since it is a geometry we can do things like this:

the_first['geometry'].wkt
'POLYGON ((-71.14789974636884 41.64758738867177, -71.1203820461734 41.49465098730397, -71.85382564969197 41.32003632258973, -71.79295081245215 41.46661652278563, -71.8009089830251 42.01324982356905, -71.37915178087496 42.02436025651181, -71.30507361518457 41.76241242122431, -71.14789974636884 41.64758738867177))'

You can also write user-defined functions that take geometries as input, output, or both, via user defined types in the geomesa_pyspark.types module. Here is a simple but inefficient example of a user-defined function that uses both a geometry input and output to compute the centroid of a geometry. Observe in a sample of the data the geometry columns print as well known text (wkt).

from pyspark.sql.functions import udf
from geomesa_pyspark.types import PointUDT

@udf(PointUDT())
def inefficient_centroid(g):
    return g.centroid

df.select(df.state_code, inefficient_centroid(df.geometry))

Showing only top 5 rows.

state_code inefficient_centroid(geometry)
RI POINT (-71.52928731203043 41.68199156750224)
FL POINT (-82.50257410032164 28.61692830512151)
OK POINT (-97.5041982257254 35.58149601643106)
MN POINT (-94.17743611908642 46.360073163397246)
TX POINT (-99.3286756609654 31.45550825618542)

GeoMesa Functions and Spatial Relations

As documented in the function reference, various user-defined functions implemented by GeoMesa are also available for use. The example below uses a GeoMesa user-defined function to compute the centroid of a geometry. It is logically equivalent to the example above, but more efficient.

from pyrasterframes.rasterfunctions import st_centroid
df.select(df.state_code, inefficient_centroid(df.geometry), st_centroid(df.geometry))

Showing only top 5 rows.

state_code inefficient_centroid(geometry) st_centroid(geometry)
RI POINT (-71.52928731203043 41.68199156750224) POINT (-71.52928731203043 41.68199156750224)
FL POINT (-82.50257410032164 28.61692830512151) POINT (-82.50257410032164 28.61692830512151)
OK POINT (-97.5041982257254 35.58149601643106) POINT (-97.5041982257254 35.58149601643106)
MN POINT (-94.17743611908642 46.360073163397246) POINT (-94.17743611908642 46.360073163397246)
TX POINT (-99.3286756609654 31.45550825618542) POINT (-99.3286756609654 31.45550825618542)

The RasterFrames vector functions and GeoMesa functions also provide a variety of spatial relations that are useful in combination with the geometric properties of projected rasters. In this example, we use the built-in Landsat catalog which provides an extent. We will convert the extent to a polygon and filter to those within approximately 500 km of a selected point.

from pyrasterframes.rasterfunctions import st_geometry, st_bufferPoint, st_intersects, st_point
from pyspark.sql.functions import lit
l8 = spark.read.format('aws-pds-l8-catalog').load()

l8 = l8.withColumn('geom', st_geometry(l8.bounds_wgs84))
l8 = l8.withColumn('paducah', st_point(lit(-88.6275), lit(37.072222)))

l8_filtered = l8.filter(st_intersects(l8.geom, st_bufferPoint(l8.paducah, lit(500000.0))))
l8_filtered.select('product_id', 'entity_id', 'acquisition_date', 'cloud_cover_pct')

Showing only top 5 rows.

product_id entity_id acquisition_date cloud_cover_pct
LC08_L1TP_020035_20130321_20170310_01_T1 LC80200352013080LGN02 2013-03-21 16:20:35.061 9.75
LC08_L1TP_026032_20130327_20170310_01_T1 LC80260322013086LGN02 2013-03-27 16:53:44.821 36.07
LC08_L1TP_022037_20130329_20170310_01_T1 LC80220372013088LGN02 2013-03-29 16:34:19.036 63.69
LC08_L1TP_021035_20130408_20170310_01_T1 LC80210352013098LGN02 2013-04-08 16:27:08.07 90.01
LC08_L1TP_023033_20130413_20170310_01_T1 LC80230332013103LGN03 2013-04-13 16:37:41.867 2.4