Time Series

Analysis Plan

In this example, we will show how the flexibility of the DataFrame concept for raster data allows a simple and intuitive way to extract a time series from Earth observation data. We will start with our built-in MODIS data catalog.

cat = spark.read.format('aws-pds-modis-catalog').load().repartition(200)
cat.printSchema()
root
 |-- product_id: string (nullable = false)
 |-- acquisition_date: timestamp (nullable = false)
 |-- granule_id: string (nullable = false)
 |-- gid: string (nullable = false)
 |-- B01: string (nullable = true)
 |-- B01qa: string (nullable = true)
 |-- B02: string (nullable = true)
 |-- B02qa: string (nullable = true)
 |-- B03: string (nullable = true)
 |-- B03aq: string (nullable = true)
 |-- B04: string (nullable = true)
 |-- B04qa: string (nullable = true)
 |-- B05: string (nullable = true)
 |-- B05qa: string (nullable = true)
 |-- B06: string (nullable = true)
 |-- B06qa: string (nullable = true)
 |-- B07: string (nullable = true)
 |-- B07qa: string (nullable = true)

We will summarize the change in NDVI over 2018 in the Cuyahoga Valley National Park in Ohio, USA. First, we will retrieve open vector data delineating the park boundary from the US National Park Service’s LandsNet.

Vector Data

First we will get the vector data from LandsNet service by a REST query. The data is saved to a geojson file.

import requests
nps_filepath = '/tmp/parks.geojson'
nps_data_query_url = 'https://services1.arcgis.com/fBc8EJBxQRMcHlei/arcgis/rest/services/' \
                     'NPS_Park_Boundaries/FeatureServer/0/query' \
                     '?geometry=-82.451,41.075,-80.682,41.436&inSR=4326&outSR=4326&f=geojson'
r = requests.get(nps_data_query_url)
with open(nps_filepath,'wb') as f:
    f.write(r.content)
m = folium.Map((41.25,-81.6), zoom_start=10).add_child(folium.GeoJson(nps_filepath))

Now we read the park boundary vector data as a Spark DataFrame using the built-in geojson DataSource. The geometry is very detailed, and the EO cells are relatively coarse. To speed up the processing, the geometry is “simplified” by combining vertices within about 100 meters of each other. For more on this see the section on Shapely support in user defined functions.

park_vector = spark.read.geojson(nps_filepath)

@udf(MultiPolygonUDT())
def simplify(g, tol):
    return g.simplify(tol)

park_vector = park_vector.withColumn('geo_simp', simplify('geometry', lit(0.001))) \
                         .select('geo_simp') \
                         .hint('broadcast')

Catalog Read

The entire park boundary is contained in MODIS granule h11 v04. We will simply filter on this granule, rather than using a spatial relation. The time period selected should show the change in plant vigor as leaves emerge over the spring and into early summer.

park_cat = cat \
            .filter(
                    (cat.granule_id == 'h11v04') &
                    (cat.acquisition_date > lit('2018-02-19')) &
                    (cat.acquisition_date < lit('2018-07-01'))            
                    ) \
            .crossJoin(park_vector)
                
park_cat.printSchema()
root
 |-- product_id: string (nullable = false)
 |-- acquisition_date: timestamp (nullable = false)
 |-- granule_id: string (nullable = false)
 |-- gid: string (nullable = false)
 |-- B01: string (nullable = true)
 |-- B01qa: string (nullable = true)
 |-- B02: string (nullable = true)
 |-- B02qa: string (nullable = true)
 |-- B03: string (nullable = true)
 |-- B03aq: string (nullable = true)
 |-- B04: string (nullable = true)
 |-- B04qa: string (nullable = true)
 |-- B05: string (nullable = true)
 |-- B05qa: string (nullable = true)
 |-- B06: string (nullable = true)
 |-- B06qa: string (nullable = true)
 |-- B07: string (nullable = true)
 |-- B07qa: string (nullable = true)
 |-- geo_simp: multipolygon (nullable = true)

Now we have a catalog with several months of MODIS data for a single granule. However, the granule is larger than our park boundary. We will combine the park geometry with the catalog, and read only the bands of interest to compute NDVI, which we discussed in a previous section.

We then reproject the park geometry to the same CRS as the imagery. Then we will filter to only the tiles intersecting the park.

raster_cols = ['B01', 'B02',] # red and near-infrared respectively
park_rf = spark.read.raster(
        catalog=park_cat.select(['acquisition_date', 'granule_id', 'geo_simp'] + raster_cols),
        catalog_col_names=raster_cols) \
    .withColumn('park_native', st_reproject('geo_simp', lit('EPSG:4326'), rf_crs('B01'))) \
    .filter(st_intersects('park_native', rf_geometry('B01'))) 

park_rf.printSchema()
root
 |-- B01_path: string (nullable = false)
 |-- B02_path: string (nullable = false)
 |-- B01: struct (nullable = true)
 |    |-- tile_context: struct (nullable = false)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- B02: struct (nullable = true)
 |    |-- tile_context: struct (nullable = false)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- acquisition_date: timestamp (nullable = false)
 |-- granule_id: string (nullable = false)
 |-- geo_simp: multipolygon (nullable = true)
 |-- park_native: geometry (nullable = true)

Vector and Raster Data Interaction

Now we have the vector representation of the park boundary alongside the tiles of red and near infrared bands. Next, we need to create a tile representation of the park to allow us to limit the time series analysis to pixels within the park. This is similar to the masking operation demonstrated in NoData handling.

We do this using two transformations. The first one will reproject the park boundary from coordinates to the MODIS sinusoidal projection. The second one will create a new tile aligned with the imagery containing a value of 1 where the pixels are contained within the park and NoData elsewhere.

rf_park_tile = park_rf \
    .withColumn('dims', rf_dimensions('B01')) \
    .withColumn('park_tile', rf_rasterize('park_native', rf_geometry('B01'), lit(1), 'dims.cols', 'dims.rows')) \
    .persist()

rf_park_tile.printSchema()
root
 |-- B01_path: string (nullable = false)
 |-- B02_path: string (nullable = false)
 |-- B01: struct (nullable = true)
 |    |-- tile_context: struct (nullable = false)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- B02: struct (nullable = true)
 |    |-- tile_context: struct (nullable = false)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- acquisition_date: timestamp (nullable = false)
 |-- granule_id: string (nullable = false)
 |-- geo_simp: multipolygon (nullable = true)
 |-- park_native: geometry (nullable = true)
 |-- dims: struct (nullable = true)
 |    |-- cols: short (nullable = false)
 |    |-- rows: short (nullable = false)
 |-- park_tile: tile (nullable = true)

Create Time Series

Next, we will compute NDVI as the normalized difference of near infrared (band 2) and red (band 1). The tiles are masked by the park_tile. We will then aggregate across the remaining values to arrive at an average NDVI for each week of the year. Note that the computation is creating a weighted average, which is weighted by the number of valid observations per week.

from pyspark.sql.functions import col, year, weekofyear, month
from pyspark.sql.functions import sum as sql_sum

rf_ndvi = rf_park_tile \
    .withColumn('ndvi', rf_normalized_difference('B02', 'B01')) \
    .withColumn('ndvi_masked', rf_mask('ndvi', 'park_tile'))

time_series = rf_ndvi \
        .withColumn('ndvi_wt', rf_tile_sum('ndvi_masked')) \
        .withColumn('wt', rf_data_cells('ndvi_masked')) \
        .groupby(year('acquisition_date').alias('year'), weekofyear('acquisition_date').alias('week')) \
        .agg(sql_sum('ndvi_wt').alias('ndvi_wt_wk'), sql_sum('wt').alias('wt_wk')) \
        .withColumn('ndvi', col('ndvi_wt_wk') / col('wt_wk'))
        
time_series.printSchema()
root
 |-- year: integer (nullable = false)
 |-- week: integer (nullable = false)
 |-- ndvi_wt_wk: double (nullable = true)
 |-- wt_wk: long (nullable = true)
 |-- ndvi: double (nullable = true)

Finally, we will take a look at the NDVI over time.

import matplotlib.pyplot as plt

time_series_pdf = time_series.toPandas()
time_series_pdf = time_series_pdf.sort_values('week')
plt.plot(time_series_pdf['week'], time_series_pdf['ndvi'], 'go-')
plt.xlabel('Week of year, 2018')
plt.ylabel('NDVI')
plt.title('Cuyahoga Valley NP Green-up')
Text(0.5,1,'Cuyahoga Valley NP Green-up')

We can see two fairly clear elbows in the curve at week 17 and week 21, indicating the start and end of the green up period. Estimation of such parameters is one technique phenology researchers use to monitor changes in climate and environment.