Zonal Map Algebra
Definition
Zonal map algebra refers to operations over raster cells based on the definition of a zone. In concept, a zone is like a mask: a raster with a special value designating membership of the cell in the zone. In general, we assume that zones are defined by vector geometries.
Analysis Plan
We will compute average NDVI over the month of May 2018 for two US national parks: Cuyahoga Valley and Indiana Dunes. We will select data from the built-in catalog, join it with park geometries, read tiles for the bands needed, burn-in or rasterize the geometries to tiles, and compute the aggregate.
Get Vector Data
First we download vector from the US National Park Service open data portal, and take a look at the data.
import requests
import folium
nps_filepath = '/tmp/2parks.geojson'
nps_data_query_url = 'https://services1.arcgis.com/fBc8EJBxQRMcHlei/arcgis/rest/services/' \
'NPS_Park_Boundaries/FeatureServer/0/query' \
'?geometry=-87.601,40.923,-81.206,41.912&inSR=4326&outSR=4326' \
"&where=UNIT_TYPE='National Park'&outFields=*&f=geojson"
r = requests.get(nps_data_query_url)
with open(nps_filepath,'wb') as f:
f.write(r.content)
m = folium.Map()
layer = folium.GeoJson(nps_filepath)
m.fit_bounds(layer.get_bounds())
m.add_child(layer)
m
Now we read the park boundary vector data as a Spark DataFrame using the built-in geojson DataSource. The geometry is very detailed, and the EO cells are relatively coarse. To speed up the processing, the geometry we “simplify” by combining vertices within about 500 meters of each other. For more on this see the section on Shapely support in user defined functions.
park_vector = spark.read.geojson(nps_filepath)
@udf(MultiPolygonUDT())
def simplify(g, tol):
return g.simplify(tol)
park_vector = park_vector.withColumn('geo_simp', simplify('geometry', lit(0.005))) \
.select('geo_simp', 'OBJECTID', 'UNIT_NAME') \
.hint('broadcast')
Catalog Read
Both parks are entirely contained in MODIS granule h11 v04. We will simply filter on this granule, rather than using a spatial relation.
cat = spark.read.format('aws-pds-modis-catalog').load().repartition(50)
park_cat = cat \
.filter(
(cat.granule_id == 'h11v04') &
(cat.acquisition_date >= lit('2018-05-01')) &
(cat.acquisition_date < lit('2018-06-01'))
) \
.crossJoin(park_vector)
park_cat.printSchema()
root
|-- product_id: string (nullable = false)
|-- acquisition_date: timestamp (nullable = false)
|-- granule_id: string (nullable = false)
|-- gid: string (nullable = false)
|-- B01: string (nullable = true)
|-- B01qa: string (nullable = true)
|-- B02: string (nullable = true)
|-- B02qa: string (nullable = true)
|-- B03: string (nullable = true)
|-- B03aq: string (nullable = true)
|-- B04: string (nullable = true)
|-- B04qa: string (nullable = true)
|-- B05: string (nullable = true)
|-- B05qa: string (nullable = true)
|-- B06: string (nullable = true)
|-- B06qa: string (nullable = true)
|-- B07: string (nullable = true)
|-- B07qa: string (nullable = true)
|-- geo_simp: multipolygon (nullable = true)
|-- OBJECTID: long (nullable = true)
|-- UNIT_NAME: string (nullable = true)
We will combine the park geometry with the catalog, and read only the bands of interest to compute NDVI, which we discussed in a previous section.
Now we have a dataframe with several months of MODIS data for a single granule. However, the granule covers a great deal of area outside our park boundaries zones. To deal with this we will, first reproject the park geometry to the same CRS as the imagery. Then we will filter to only the tiles intersecting the park zones.
raster_cols = ['B01', 'B02',] # red and near-infrared respectively
park_rf = spark.read.raster(
park_cat.select(['acquisition_date', 'granule_id'] + raster_cols + park_vector.columns),
catalog_col_names=raster_cols) \
.withColumn('park_native', st_reproject('geo_simp', lit('EPSG:4326'), rf_crs('B01'))) \
.filter(st_intersects('park_native', rf_geometry('B01')))
park_rf.printSchema()
root
|-- B01_path: string (nullable = false)
|-- B02_path: string (nullable = false)
|-- B01: struct (nullable = true)
| |-- tile_context: struct (nullable = true)
| | |-- extent: struct (nullable = false)
| | | |-- xmin: double (nullable = false)
| | | |-- ymin: double (nullable = false)
| | | |-- xmax: double (nullable = false)
| | | |-- ymax: double (nullable = false)
| | |-- crs: struct (nullable = false)
| | | |-- crsProj4: string (nullable = false)
| |-- tile: tile (nullable = false)
|-- B02: struct (nullable = true)
| |-- tile_context: struct (nullable = true)
| | |-- extent: struct (nullable = false)
| | | |-- xmin: double (nullable = false)
| | | |-- ymin: double (nullable = false)
| | | |-- xmax: double (nullable = false)
| | | |-- ymax: double (nullable = false)
| | |-- crs: struct (nullable = false)
| | | |-- crsProj4: string (nullable = false)
| |-- tile: tile (nullable = false)
|-- acquisition_date: timestamp (nullable = false)
|-- granule_id: string (nullable = false)
|-- geo_simp: multipolygon (nullable = true)
|-- OBJECTID: long (nullable = true)
|-- UNIT_NAME: string (nullable = true)
|-- park_native: geometry (nullable = true)
Define Zone Tiles
Now we have the vector representation of the park boundary alongside the tiles of red and near infrared bands. Next, we need to create a tile representation of the park to allow us to limit the raster analysis to pixels within the park zone. This is similar to the masking operation demonstrated in Masking. We rasterize the geometries using rf_rasterize
: this creates a new tile column aligned with the imagery, and containing the park’s OBJECTID attribute for cells intersecting the zone. Cells outside the park zones have a NoData value.
rf_park_tile = park_rf \
.withColumn('dims', rf_dimensions('B01')) \
.withColumn('park_zone_tile', rf_rasterize('park_native', rf_geometry('B01'), 'OBJECTID', 'dims.cols', 'dims.rows')) \
.persist()
rf_park_tile.printSchema()
root
|-- B01_path: string (nullable = false)
|-- B02_path: string (nullable = false)
|-- B01: struct (nullable = true)
| |-- tile_context: struct (nullable = true)
| | |-- extent: struct (nullable = false)
| | | |-- xmin: double (nullable = false)
| | | |-- ymin: double (nullable = false)
| | | |-- xmax: double (nullable = false)
| | | |-- ymax: double (nullable = false)
| | |-- crs: struct (nullable = false)
| | | |-- crsProj4: string (nullable = false)
| |-- tile: tile (nullable = false)
|-- B02: struct (nullable = true)
| |-- tile_context: struct (nullable = true)
| | |-- extent: struct (nullable = false)
| | | |-- xmin: double (nullable = false)
| | | |-- ymin: double (nullable = false)
| | | |-- xmax: double (nullable = false)
| | | |-- ymax: double (nullable = false)
| | |-- crs: struct (nullable = false)
| | | |-- crsProj4: string (nullable = false)
| |-- tile: tile (nullable = false)
|-- acquisition_date: timestamp (nullable = false)
|-- granule_id: string (nullable = false)
|-- geo_simp: multipolygon (nullable = true)
|-- OBJECTID: long (nullable = true)
|-- UNIT_NAME: string (nullable = true)
|-- park_native: geometry (nullable = true)
|-- dims: struct (nullable = true)
| |-- cols: integer (nullable = false)
| |-- rows: integer (nullable = false)
|-- park_zone_tile: tile (nullable = true)
Compute Zonal Statistics
We compute NDVI as the normalized difference of near infrared (band 2) and red (band 1). The tiles are masked by the park_zone_tile
, limiting the cells to those in the zone. To finish, we compute our desired statistics over the NVDI tiles that are limited by the zone.
from pyspark.sql.functions import col
from pyspark.sql import functions as F
rf_ndvi = rf_park_tile \
.withColumn('ndvi', rf_normalized_difference('B02', 'B01')) \
.withColumn('ndvi_masked', rf_mask('ndvi', 'park_zone_tile'))
zonal_mean = rf_ndvi \
.groupby('OBJECTID', 'UNIT_NAME') \
.agg(rf_agg_mean('ndvi_masked'))
zonal_mean
OBJECTID | UNIT_NAME | rf_agg_mean(ndvi_masked) |
---|---|---|
375 | Indiana Dunes National Park | 0.48937965870407857 |
324 | Cuyahoga Valley National Park | 0.6309672224560483 |