Scala and SQL

One of the great powers of RasterFrames is the ability to express computation in multiple programming languages. The content in this manual focuses on Python because it is the most commonly used language in data science and GIS analytics. However, Scala (the implementation language of RasterFrames) and SQL (commonly used in many domains) are also fully supported. Examples in Python can be mechanically translated into the other two languages without much difficulty once the naming conventions are understood.

In the sections below we will show the same example program in each language. To do so we will compute the average NDVI per month for a single tile in Tanzania.

Python

Step 1: Load the catalog

modis = spark.read.format('aws-pds-modis-catalog').load()

Step 2: Down-select data by month

red_nir_monthly_2017 = modis \
    .select(
        col('granule_id'),
        month('acquisition_date').alias('month'),
        col('B01').alias('red'),
        col('B02').alias('nir')
    ) \
    .where(
        (year('acquisition_date') == 2017) & 
        (dayofmonth('acquisition_date') == 15) & 
        (col('granule_id') == 'h21v09')
    )
red_nir_monthly_2017.printSchema()
root
 |-- granule_id: string (nullable = false)
 |-- month: integer (nullable = false)
 |-- red: string (nullable = true)
 |-- nir: string (nullable = true)

Step 3: Read tiles

red_nir_tiles_monthly_2017 = spark.read.raster(
    red_nir_monthly_2017,
    catalog_col_names=['red', 'nir'],
    tile_dimensions=(256, 256)
)

Step 4: Compute aggregates

result = red_nir_tiles_monthly_2017 \
    .where(st_intersects(
        st_reproject(rf_geometry(col('red')), rf_crs(col('red')).crsProj4, rf_mk_crs('EPSG:4326')),
        st_makePoint(lit(34.870605), lit(-4.729727)))
    ) \
    .groupBy('month') \
    .agg(rf_agg_stats(rf_normalized_difference(col('nir'), col('red'))).alias('ndvi_stats')) \
    .orderBy(col('month')) \
    .select('month', 'ndvi_stats.*')
result

Showing only top 5 rows.

month data_cells no_data_cells min max mean variance
1 65523 13 -0.2519809825673534 0.8644836272040303 0.4062596673810191 0.01407280838385605
2 65521 15 -0.21232123212321233 0.8872390789756832 0.46738863804011127 0.011822698212885174
3 64425 1111 -0.36211340206185566 0.9208261617900172 0.5811071411891395 0.011570245465885753
4 64236 1300 -0.17252657399836469 0.922397476340694 0.5254596885897274 0.01087259231886667
5 60819 4717 -0.19951338199513383 0.916626036079961 0.46471430090039234 0.01215478443067744

SQL

For convenience, we’re going to evaluate SQL from the Python environment. The SQL fragments should work in the spark-sql shell just the same.

def sql(stmt):
    return spark.sql(stmt)

Step 1: Load the catalog

sql("CREATE OR REPLACE TEMPORARY VIEW modis USING `aws-pds-modis-catalog`")

Step 2: Down-select data by month

sql("""
CREATE OR REPLACE TEMPORARY VIEW red_nir_monthly_2017 AS
SELECT granule_id, month(acquisition_date) as month, B01 as red, B02 as nir
FROM modis
WHERE year(acquisition_date) = 2017 AND day(acquisition_date) = 15 AND granule_id = 'h21v09'
""")
sql('DESCRIBE red_nir_monthly_2017')
col_name data_type comment
granule_id string
month int
red string
nir string

Step 3: Read tiles

sql("""
CREATE OR REPLACE TEMPORARY VIEW red_nir_tiles_monthly_2017
USING raster
OPTIONS (
    catalog_table='red_nir_monthly_2017',
    catalog_col_names='red,nir',
    tile_dimensions='256,256'
    )
""")

Step 4: Compute aggregates

grouped = sql("""
SELECT month, ndvi_stats.* FROM (
    SELECT month, rf_agg_stats(rf_normalized_difference(nir, red)) as ndvi_stats
    FROM red_nir_tiles_monthly_2017
    WHERE st_intersects(st_reproject(rf_geometry(red), rf_crs(red), 'EPSG:4326'), st_makePoint(34.870605, -4.729727))
    GROUP BY month
    ORDER BY month
)
""")
grouped

Showing only top 5 rows.

month data_cells no_data_cells min max mean variance
1 65523 13 -0.2519809825673534 0.8644836272040303 0.4062596673810191 0.01407280838385605
2 65521 15 -0.21232123212321233 0.8872390789756832 0.46738863804011127 0.011822698212885174
3 64425 1111 -0.36211340206185566 0.9208261617900172 0.5811071411891395 0.011570245465885753
4 64236 1300 -0.17252657399836469 0.922397476340694 0.5254596885897274 0.01087259231886667
5 60819 4717 -0.19951338199513383 0.916626036079961 0.46471430090039234 0.01215478443067744

Scala

The latest Scala API documentation is available here:

Step 1: Load the catalog

import geotrellis.proj4.LatLng
import org.locationtech.rasterframes._
import org.locationtech.rasterframes.datasource.raster._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._


implicit val spark = SparkSession.builder()
  .master("local[*]")
  .appName("RasterFrames")
  .withKryoSerialization
  .getOrCreate()
  .withRasterFrames

import spark.implicits._

val modis = spark.read.format("aws-pds-modis-catalog").load()

Step 2: Down-select data by month

val red_nir_monthly_2017 = modis
  .select($"granule_id", month($"acquisition_date") as "month", $"B01" as "red", $"B02" as "nir")
  .where(year($"acquisition_date") === 2017 && (dayofmonth($"acquisition_date") === 15) && $"granule_id" === "h21v09")

Step 3: Read tiles

val red_nir_tiles_monthly_2017 = spark.read.raster
  .fromCatalog(red_nir_monthly_2017, "red", "nir")
  .load()

Step 4: Compute aggregates

val result = red_nir_tiles_monthly_2017
  .where(st_intersects(
    st_reproject(rf_geometry($"red"), rf_crs($"red"), LatLng),
    st_makePoint(34.870605, -4.729727)
  ))
  .groupBy("month")
  .agg(rf_agg_stats(rf_normalized_difference($"nir", $"red")) as "ndvi_stats")
  .orderBy("month")
  .select("month", "ndvi_stats.*")
  
result.show()