Zonal Map Algebra

Definition

Zonal map algebra refers to operations over raster cells based on the definition of a zone. In concept, a zone is like a mask: a raster with a special value designating membership of the cell in the zone. In general, we assume that zones are defined by vector geometries.

Analysis Plan

We will compute average NDVI over the month of May 2018 for two US national parks: Cuyahoga Valley and Indiana Dunes. We will select data from the built-in catalog, join it with park geometries, read tiles for the bands needed, burn-in or rasterize the geometries to tiles, and compute the aggregate.

Get Vector Data

First we download vector from the US National Park Service open data portal, and take a look at the data.

import requests
import folium

nps_filepath = '/tmp/2parks.geojson'
nps_data_query_url = 'https://services1.arcgis.com/fBc8EJBxQRMcHlei/arcgis/rest/services/' \
                     'NPS_Park_Boundaries/FeatureServer/0/query' \
                     '?geometry=-87.601,40.923,-81.206,41.912&inSR=4326&outSR=4326' \
                     "&where=UNIT_TYPE='National Park'&outFields=*&f=geojson"
r = requests.get(nps_data_query_url)
with open(nps_filepath,'wb') as f:
    f.write(r.content)

m = folium.Map()
layer = folium.GeoJson(nps_filepath)
m.fit_bounds(layer.get_bounds())
m.add_child(layer)
m

Now we read the park boundary vector data as a Spark DataFrame using the built-in geojson DataSource. The geometry is very detailed, and the EO cells are relatively coarse. To speed up the processing, the geometry we “simplify” by combining vertices within about 500 meters of each other. For more on this see the section on Shapely support in user defined functions.

park_vector = spark.read.geojson(nps_filepath)

@udf(MultiPolygonUDT())
def simplify(g, tol):
    return g.simplify(tol)

park_vector = park_vector.withColumn('geo_simp', simplify('geometry', lit(0.005))) \
                         .select('geo_simp', 'OBJECTID', 'UNIT_NAME') \
                         .hint('broadcast')

Catalog Read

Both parks are entirely contained in MODIS granule h11 v04. We will simply filter on this granule, rather than using a spatial relation.

cat = spark.read.format('aws-pds-modis-catalog').load().repartition(50)
park_cat = cat \
            .filter(
                    (cat.granule_id == 'h11v04') &
                    (cat.acquisition_date >= lit('2018-05-01')) &
                    (cat.acquisition_date < lit('2018-06-01'))            
                    ) \
            .crossJoin(park_vector)
                
park_cat.printSchema()
root
 |-- product_id: string (nullable = false)
 |-- acquisition_date: timestamp (nullable = false)
 |-- granule_id: string (nullable = false)
 |-- gid: string (nullable = false)
 |-- B01: string (nullable = true)
 |-- B01qa: string (nullable = true)
 |-- B02: string (nullable = true)
 |-- B02qa: string (nullable = true)
 |-- B03: string (nullable = true)
 |-- B03aq: string (nullable = true)
 |-- B04: string (nullable = true)
 |-- B04qa: string (nullable = true)
 |-- B05: string (nullable = true)
 |-- B05qa: string (nullable = true)
 |-- B06: string (nullable = true)
 |-- B06qa: string (nullable = true)
 |-- B07: string (nullable = true)
 |-- B07qa: string (nullable = true)
 |-- geo_simp: multipolygon (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- UNIT_NAME: string (nullable = true)

We will combine the park geometry with the catalog, and read only the bands of interest to compute NDVI, which we discussed in a previous section.

Now we have a dataframe with several months of MODIS data for a single granule. However, the granule covers a great deal of area outside our park boundaries zones. To deal with this we will, first reproject the park geometry to the same CRS as the imagery. Then we will filter to only the tiles intersecting the park zones.

raster_cols = ['B01', 'B02',] # red and near-infrared respectively
park_rf = spark.read.raster(
        park_cat.select(['acquisition_date', 'granule_id'] + raster_cols + park_vector.columns),
        catalog_col_names=raster_cols) \
    .withColumn('park_native', st_reproject('geo_simp', lit('EPSG:4326'), rf_crs('B01'))) \
    .filter(st_intersects('park_native', rf_geometry('B01'))) 

park_rf.printSchema()
root
 |-- B01_path: string (nullable = false)
 |-- B02_path: string (nullable = false)
 |-- B01: struct (nullable = true)
 |    |-- tile_context: struct (nullable = true)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- B02: struct (nullable = true)
 |    |-- tile_context: struct (nullable = true)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- acquisition_date: timestamp (nullable = false)
 |-- granule_id: string (nullable = false)
 |-- geo_simp: multipolygon (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- UNIT_NAME: string (nullable = true)
 |-- park_native: geometry (nullable = true)

Define Zone Tiles

Now we have the vector representation of the park boundary alongside the tiles of red and near infrared bands. Next, we need to create a tile representation of the park to allow us to limit the raster analysis to pixels within the park zone. This is similar to the masking operation demonstrated in Masking. We rasterize the geometries using rf_rasterize: this creates a new tile column aligned with the imagery, and containing the park’s OBJECTID attribute for cells intersecting the zone. Cells outside the park zones have a NoData value.

rf_park_tile = park_rf \
    .withColumn('dims', rf_dimensions('B01')) \
    .withColumn('park_zone_tile', rf_rasterize('park_native', rf_geometry('B01'), 'OBJECTID', 'dims.cols', 'dims.rows')) \
    .persist()

rf_park_tile.printSchema()
root
 |-- B01_path: string (nullable = false)
 |-- B02_path: string (nullable = false)
 |-- B01: struct (nullable = true)
 |    |-- tile_context: struct (nullable = true)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- B02: struct (nullable = true)
 |    |-- tile_context: struct (nullable = true)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- acquisition_date: timestamp (nullable = false)
 |-- granule_id: string (nullable = false)
 |-- geo_simp: multipolygon (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- UNIT_NAME: string (nullable = true)
 |-- park_native: geometry (nullable = true)
 |-- dims: struct (nullable = true)
 |    |-- cols: short (nullable = false)
 |    |-- rows: short (nullable = false)
 |-- park_zone_tile: tile (nullable = true)

Compute Zonal Statistics

We compute NDVI as the normalized difference of near infrared (band 2) and red (band 1). The tiles are masked by the park_zone_tile, limiting the cells to those in the zone. To finish, we compute our desired statistics over the NVDI tiles that are limited by the zone.

from pyspark.sql.functions import col
from pyspark.sql import functions as F

rf_ndvi = rf_park_tile \
    .withColumn('ndvi', rf_normalized_difference('B02', 'B01')) \
    .withColumn('ndvi_masked', rf_mask('ndvi', 'park_zone_tile'))

zonal_mean = rf_ndvi \
        .groupby('OBJECTID', 'UNIT_NAME') \
        .agg(rf_agg_mean('ndvi'))

zonal_mean
OBJECTID UNIT_NAME rf_agg_mean(ndvi)
363 Cuyahoga Valley National Park 0.6280484315737153
416 Indiana Dunes National Park 0.12640338164827297