Reading Raster Data

RasterFrames registers a DataSource named raster that enables reading of GeoTIFFs (and other formats when GDAL is installed) from arbitrary URIs. The raster DataSource operates on either a single raster file location or another DataFrame, called a catalog, containing pointers to many raster file locations.

RasterFrames can also read from GeoTrellis catalogs and layers.

Single Rasters

The simplest way to use the raster reader is with a single raster from a single URI or file. In the examples that follow we’ll be reading from a Sentinel-2 scene stored in an AWS S3 bucket.

rf = spark.read.raster('https://rasterframes.s3.amazonaws.com/samples/luray_snp/B02.tif')
rf.printSchema()
root
 |-- proj_raster_path: string (nullable = false)
 |-- proj_raster: struct (nullable = true)
 |    |-- tile_context: struct (nullable = true)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)

The file at the address above is a valid Cloud Optimized GeoTIFF (COG), which RasterFrames fully supports. RasterFrames will take advantage of the optimizations in the COG format to enable more efficient reading compared to non-COG GeoTIFFs.

Let’s unpack the proj_raster column and look at the contents in more detail. It contains a CRS, a spatial extent measured in that CRS, and a two-dimensional array of numeric values called a tile.

crs = rf.select(rf_crs("proj_raster").alias("value")).first()
print("CRS", crs.value.crsProj4)
CRS +proj=utm +zone=17 +datum=WGS84 +units=m +no_defs
rf.select(
    rf_extent("proj_raster").alias("extent"),
    rf_tile("proj_raster").alias("tile")
)

Showing only top 5 rows.

extent tile
[807480.0, 4207860.0, 809760.0, 4223220.0]
[792120.0, 4269300.0, 807480.0, 4284660.0]
[699960.0, 4223220.0, 715320.0, 4238580.0]
[807480.0, 4190220.0, 809760.0, 4192500.0]
[715320.0, 4223220.0, 730680.0, 4238580.0]

You can also see that the single raster has been broken out into many rows containing arbitrary non-overlapping regions. Doing so takes advantage of parallel in-memory reads from the cloud hosted data source and allows Spark to work on manageable amounts of data per row. The map below shows downsampled imagery with the bounds of the individual tiles.

Note

The image contains visible “seams” between the tile extents due to reprojection and downsampling used to create the image. The native imagery in the DataFrame does not contain any gaps in the source raster’s coverage.

Let’s select a single tile and view it. The tile preview image as well as the string representation provide some basic information about the tile: its dimensions as numbers of columns and rows and the cell type, or data type of all the cells in the tile. For more about cell types, refer to this discussion.

tile = rf.select(rf_tile("proj_raster")).first()[0]
display(tile)

Multiple Singleband Rasters

In this example, we show the reading two bands of Landsat 8 imagery (red and near-infrared), combining them with rf_normalized_difference to compute NDVI, a common measure of vegetation health. As described in the section on catalogs, image URIs in a single row are assumed to be from the same scene/granule, and therefore compatible. This pattern is commonly used when multiple bands are stored in separate files.

bands = [f'B{b}' for b in [4, 5]]
uris = [f'https://landsat-pds.s3.us-west-2.amazonaws.com/c1/L8/014/032/LC08_L1TP_014032_20190720_20190731_01_T1/LC08_L1TP_014032_20190720_20190731_01_T1_{b}.TIF' for b in bands]
catalog = ','.join(bands) + '\n' + ','.join(uris)

rf = (spark.read.raster(catalog, bands) 
    # Adding semantic names 
    .withColumnRenamed('B4', 'red').withColumnRenamed('B5', 'NIR')  
    # Adding tile center point for reference 
    .withColumn('longitude_latitude', st_reproject(st_centroid(rf_geometry('red')), rf_crs('red'), lit('EPSG:4326')))
    # Compute NDVI  
    .withColumn('NDVI', rf_normalized_difference('NIR', 'red'))
    # For the purposes of inspection, filter out rows where there's not much vegetation  
    .where(rf_tile_sum('NDVI') > 10000) 
    # Order output 
    .select('longitude_latitude', 'red', 'NIR', 'NDVI')) 
display(rf)

Showing only top 5 rows.

longitude_latitude red NIR NDVI
POINT (-74.54798498452405 40.38740603406…
POINT (-74.27356358544107 40.66276020383…
POINT (-74.64174679709124 39.76498729658…
POINT (-75.08989515249583 39.62710756189…
POINT (-73.99582313850216 41.00659730217…

Multiband Rasters

A multiband raster is represented by a three dimensional numeric array stored in a single file. The first two dimensions are spatial, and the third dimension is typically designated for different spectral bands. The bands could represent intensity of different wavelengths of light (or other electromagnetic radiation), or they could measure other phenomena such as time, quality indications, or additional gas concentrations, etc.

Multiband rasters files have a strictly ordered set of bands, which are typically indexed from 1. Some files have metadata tags associated with each band. Some files have a color interpetation metadata tag indicating how to interpret the bands.

When reading a multiband raster or a catalog describing multiband rasters, you will need to know ahead of time which bands you want to read. You will specify the bands to read, indexed from zero, as a list of integers into the band_indexes parameter of the raster reader.

For example, we can read a four-band (red, green, blue, and near-infrared) image as follows. The individual rows of the resulting DataFrame still represent distinct spatial extents, with a projected raster column for each band specified by band_indexes.

mb = spark.read.raster(
    'https://rasterframes.s3.amazonaws.com/samples/naip/m_3807863_nw_17_1_20160620.tif',
    band_indexes=[0, 1, 2, 3],
)
display(mb)

Showing only top 5 rows.

proj_raster_path proj_raster_b0 proj_raster_b1 proj_raster_b2 proj_raster_b3
https://rasterframes.s3.amazonaws.com/samples/naip/m_3807863_nw_17_1_20160620.tif
https://rasterframes.s3.amazonaws.com/samples/naip/m_3807863_nw_17_1_20160620.tif
https://rasterframes.s3.amazonaws.com/samples/naip/m_3807863_nw_17_1_20160620.tif
https://rasterframes.s3.amazonaws.com/samples/naip/m_3807863_nw_17_1_20160620.tif
https://rasterframes.s3.amazonaws.com/samples/naip/m_3807863_nw_17_1_20160620.tif

If a band is passed into band_indexes that exceeds the number of bands in the raster, a projected raster column will still be generated in the schema but the column will be full of null values.

You can also pass a catalog and band_indexes together into the raster reader. This will create a projected raster column for the combination of all items in catalog_col_names and band_indexes. Again if a band in band_indexes exceeds the number of bands in a raster, it will have a null value for the corresponding column.

Here is a trivial example with a catalog over multiband rasters. We specify two columns containing URIs and two bands, resulting in four projected raster columns.

import pandas as pd
mb_cat = pd.DataFrame([
    {'foo': 'https://rasterframes.s3.amazonaws.com/samples/naip/m_3807863_nw_17_1_20160620.tif',
     'bar': 'https://rasterframes.s3.amazonaws.com/samples/naip/m_3807863_nw_17_1_20160620.tif'
    },
])
mb2 = spark.read.raster(
    spark.createDataFrame(mb_cat),
    catalog_col_names=['foo', 'bar'],
    band_indexes=[0, 1],
    tile_dimensions=(64,64)
)
mb2.printSchema()
root
 |-- foo_path: string (nullable = false)
 |-- bar_path: string (nullable = false)
 |-- foo_b0: struct (nullable = true)
 |    |-- tile_context: struct (nullable = true)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- foo_b1: struct (nullable = true)
 |    |-- tile_context: struct (nullable = true)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- bar_b0: struct (nullable = true)
 |    |-- tile_context: struct (nullable = true)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- bar_b1: struct (nullable = true)
 |    |-- tile_context: struct (nullable = true)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)

URI Formats

RasterFrames relies on three different I/O drivers, selected based on a combination of scheme, file extentions, and library availability. GDAL is used by default if a compatible version of GDAL (>= 2.4) is installed, and if GDAL supports the specified scheme. If GDAL is not available, either the Java I/O or Hadoop driver will be selected, depending on scheme.

Note: The GDAL driver is the only one that can read non-GeoTIFF files.

Prefix GDAL Java I/O Hadoop
gdal://<vsidrv>// yes no no
file:// yes yes no
http:// yes yes no
https:// yes yes no
ftp:// /vsicurl/ yes no
hdfs:// /vsihdfs/ no yes
s3:// /vsis3/ yes no
s3n:// no no yes
s3a:// no no yes
wasb:// /vsiaz/ no yes
wasbs:// no no yes

Specific GDAL Virtual File System drivers can be selected using the gdal://<vsidrv>// syntax. For example If you have a archive.zip file containing a GeoTiff named my-file-inside.tif, you can address it with gdal://vsizip//path/to/archive.zip/my-file-inside.tif. Another example would be a MRF file in an S3 bucket on AWS: gdal://vsis3/my-bucket/prefix/to/raster.mrf. See the GDAL documentation for the format of the URIs after the gdal:/ scheme.

Raster Catalogs

Consider the definition of a Catalog previously discussed, let’s read the raster data contained in the catalog URIs. We will start with the external catalog of MODIS surface reflectance.

from pyspark import SparkFiles
from pyspark.sql import functions as F

cat_filename = "2018-07-04_scenes.txt"
spark.sparkContext.addFile("https://modis-pds.s3.amazonaws.com/MCD43A4.006/{}".format(cat_filename))

modis_catalog = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load(SparkFiles.get(cat_filename)) \
    .withColumn('base_url',
        F.concat(F.regexp_replace('download_url', 'index.html$', ''), 'gid')
    ) \
    .drop('download_url') \
    .withColumn('red' , F.concat('base_url', F.lit("_B01.TIF"))) \
    .withColumn('nir' , F.concat('base_url', F.lit("_B02.TIF")))

print("Available scenes: ", modis_catalog.count())
Available scenes:  297
modis_catalog

Showing only top 5 rows.

date gid base_url red nir
2018-07-04 00:00:00 MCD43A4.A2018185.h04v09.006.2018194032851 https://modis-pds.s3.amazonaws.com/MCD43A4.006/04/09/2018185/MCD43A4.A2018185.h04v09.006.2018194032851 https://modis-pds.s3.amazonaws.com/MCD43A4.006/04/09/2018185/MCD43A4.A2018185.h04v09.006.2018194032851_B01.TIF https://modis-pds.s3.amazonaws.com/MCD43A4.006/04/09/2018185/MCD43A4.A2018185.h04v09.006.2018194032851_B02.TIF
2018-07-04 00:00:00 MCD43A4.A2018185.h01v09.006.2018194032819 https://modis-pds.s3.amazonaws.com/MCD43A4.006/01/09/2018185/MCD43A4.A2018185.h01v09.006.2018194032819 https://modis-pds.s3.amazonaws.com/MCD43A4.006/01/09/2018185/MCD43A4.A2018185.h01v09.006.2018194032819_B01.TIF https://modis-pds.s3.amazonaws.com/MCD43A4.006/01/09/2018185/MCD43A4.A2018185.h01v09.006.2018194032819_B02.TIF
2018-07-04 00:00:00 MCD43A4.A2018185.h06v03.006.2018194032807 https://modis-pds.s3.amazonaws.com/MCD43A4.006/06/03/2018185/MCD43A4.A2018185.h06v03.006.2018194032807 https://modis-pds.s3.amazonaws.com/MCD43A4.006/06/03/2018185/MCD43A4.A2018185.h06v03.006.2018194032807_B01.TIF https://modis-pds.s3.amazonaws.com/MCD43A4.006/06/03/2018185/MCD43A4.A2018185.h06v03.006.2018194032807_B02.TIF
2018-07-04 00:00:00 MCD43A4.A2018185.h03v09.006.2018194032826 https://modis-pds.s3.amazonaws.com/MCD43A4.006/03/09/2018185/MCD43A4.A2018185.h03v09.006.2018194032826 https://modis-pds.s3.amazonaws.com/MCD43A4.006/03/09/2018185/MCD43A4.A2018185.h03v09.006.2018194032826_B01.TIF https://modis-pds.s3.amazonaws.com/MCD43A4.006/03/09/2018185/MCD43A4.A2018185.h03v09.006.2018194032826_B02.TIF
2018-07-04 00:00:00 MCD43A4.A2018185.h08v09.006.2018194032839 https://modis-pds.s3.amazonaws.com/MCD43A4.006/08/09/2018185/MCD43A4.A2018185.h08v09.006.2018194032839 https://modis-pds.s3.amazonaws.com/MCD43A4.006/08/09/2018185/MCD43A4.A2018185.h08v09.006.2018194032839_B01.TIF https://modis-pds.s3.amazonaws.com/MCD43A4.006/08/09/2018185/MCD43A4.A2018185.h08v09.006.2018194032839_B02.TIF

MODIS data products are delivered on a regular, consistent grid, making identification of a specific area over time easy using (h,v) grid coordinates (see below).

MODIS Grid

For example, MODIS data right above the equator is all grid coordinates with v07.

equator = modis_catalog.where(F.col('gid').like('%v07%')) 
equator.select('date', 'gid')

Showing only top 5 rows.

date gid
2018-07-04 00:00:00 MCD43A4.A2018185.h07v07.006.2018194033213
2018-07-04 00:00:00 MCD43A4.A2018185.h08v07.006.2018194033224
2018-07-04 00:00:00 MCD43A4.A2018185.h09v07.006.2018194033547
2018-07-04 00:00:00 MCD43A4.A2018185.h26v07.006.2018194033904
2018-07-04 00:00:00 MCD43A4.A2018185.h12v07.006.2018194033912

Now that we have prepared our catalog, we simply pass the DataFrame or CSV string to the raster DataSource to load the imagery. The catalog_col_names parameter gives the columns that contain the URI’s to be read.

rf = spark.read.raster(equator, catalog_col_names=['red', 'nir'])
rf.printSchema()
root
 |-- red_path: string (nullable = false)
 |-- nir_path: string (nullable = false)
 |-- red: struct (nullable = true)
 |    |-- tile_context: struct (nullable = true)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- nir: struct (nullable = true)
 |    |-- tile_context: struct (nullable = true)
 |    |    |-- extent: struct (nullable = false)
 |    |    |    |-- xmin: double (nullable = false)
 |    |    |    |-- ymin: double (nullable = false)
 |    |    |    |-- xmax: double (nullable = false)
 |    |    |    |-- ymax: double (nullable = false)
 |    |    |-- crs: struct (nullable = false)
 |    |    |    |-- crsProj4: string (nullable = false)
 |    |-- tile: tile (nullable = false)
 |-- date: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- base_url: string (nullable = true)

Observe the schema of the resulting DataFrame has a projected raster struct for each column passed in catalog_col_names. For reference, the URI is now in a column appended with _path. Taking a quick look at the representation of the data, we see again each row contains an arbitrary portion of the entire scene coverage. We also see that for two-D catalogs, each row contains the same spatial extent for all tiles in that row.

sample = rf \
    .select('gid', rf_extent('red'), rf_extent('nir'), rf_tile('red'), rf_tile('nir')) \
    .where(~rf_is_no_data_tile('red'))
sample.limit(3)
gid rf_extent(red) rf_extent(nir) rf_tile(red) rf_tile(nir)
MCD43A4.A2018185.h16v07.006.2018194033835 [-1156428.54045364, 1630860.7621777998, -1111950.519667, 1749468.8176088398] [-1156428.54045364, 1630860.7621777998, -1111950.519667, 1749468.8176088398]
MCD43A4.A2018185.h22v07.006.2018194034134 [4566410.13409804, 1111950.519667, 4685018.18952908, 1156428.54045364] [4566410.13409804, 1111950.519667, 4685018.18952908, 1156428.54045364]
MCD43A4.A2018185.h33v07.006.2018194035136 [1.7628122238449175E7, 1275036.59588468, 1.774673029388032E7, 1393644.65131572] [1.7628122238449175E7, 1275036.59588468, 1.774673029388032E7, 1393644.65131572]

Lazy Raster Reading

By default, reading raster pixel values is delayed until it is absolutely needed. The DataFrame will contain metadata and pointers to the appropriate portion of the data until reading of the source raster data is absolutely necessary. This can save a significant of computation and I/O time for two reasons. First, a catalog may contain a large number of rows. Second, the raster DataSource attempts to apply spatial preciates (e.g. where/WHERE clauses with st_intersects, et al.) at row creation, reducing the chance of unneeded data being fetched.

Consider the following two reads of the same data source. In the first, the lazy case, there is a pointer to the URI, extent and band to read. This will not be evaluated until the cell values are absolutely required. The second case shows the option to force the raster to be fully loaded right away.

uri = 'https://rasterframes.s3.amazonaws.com/samples/luray_snp/B02.tif'
lazy = spark.read.raster(uri).select(col('proj_raster.tile').cast('string'))
lazy

Showing only top 5 rows.

tile
RasterRefTile(RasterRef(JVMGeoTiffRasterSource(https://rasterframes.s3.amazonaws.com/samples/luray_snp/B02.tif),0,Some(Extent(807480.0, 4207860.0, 809760.0, 4223220.0)),Some(GridBounds(1792,1280,1829,1535))))
RasterRefTile(RasterRef(JVMGeoTiffRasterSource(https://rasterframes.s3.amazonaws.com/samples/luray_snp/B02.tif),0,Some(Extent(792120.0, 4269300.0, 807480.0, 4284660.0)),Some(GridBounds(1536,256,1791,511))))
RasterRefTile(RasterRef(JVMGeoTiffRasterSource(https://rasterframes.s3.amazonaws.com/samples/luray_snp/B02.tif),0,Some(Extent(699960.0, 4223220.0, 715320.0, 4238580.0)),Some(GridBounds(0,1024,255,1279))))
RasterRefTile(RasterRef(JVMGeoTiffRasterSource(https://rasterframes.s3.amazonaws.com/samples/luray_snp/B02.tif),0,Some(Extent(807480.0, 4190220.0, 809760.0, 4192500.0)),Some(GridBounds(1792,1792,1829,1829))))
RasterRefTile(RasterRef(JVMGeoTiffRasterSource(https://rasterframes.s3.amazonaws.com/samples/luray_snp/B02.tif),0,Some(Extent(715320.0, 4223220.0, 730680.0, 4238580.0)),Some(GridBounds(256,1024,511,1279))))
non_lazy = spark.read.raster(uri, lazy_tiles=False).select(col('proj_raster.tile').cast('string'))
non_lazy

Showing only top 5 rows.

tile
ArrayTile(256,256,uint16raw)
ArrayTile(38,256,uint16raw)
ArrayTile(256,256,uint16raw)
ArrayTile(256,256,uint16raw)
ArrayTile(256,256,uint16raw)

In the initial examples on this page, you may have noticed that the realized (non-lazy) tiles are shown, but we did not change lazy_tiles. Instead, we used rf_tile to explicitly request the realized tile from the lazy representation.

Spatial Indexing and Partitioning

Warning

This is an experimental feature, and may be removed.

It’s often desirable to take extra steps in ensuring your data is effectively distributed over your computing resources. One way of doing that is using something called a “space filling curve”, which turns an N-dimensional value into a one dimensional value, with properties that favor keeping entities near each other in N-space near each other in index space. In particular RasterFrames support space-filling curves mapping the geographic location of tiles to a one-dimensional index space called xz2. To have RasterFrames add a spatial index based partitioning on a raster reads, use the spatial_index_partitions parameter. By default it will use the same number of partitions as configured in spark.sql.shuffle.partitions.

df = spark.read.raster(uri, spatial_index_partitions=True)
df

Showing only top 5 rows.

proj_raster_path proj_raster spatial_index
https://rasterframes.s3.amazonaws.com/samples/luray_snp/B02.tif 55256634714
https://rasterframes.s3.amazonaws.com/samples/luray_snp/B02.tif 55256634714
https://rasterframes.s3.amazonaws.com/samples/luray_snp/B02.tif 55256634715
https://rasterframes.s3.amazonaws.com/samples/luray_snp/B02.tif 55256656560
https://rasterframes.s3.amazonaws.com/samples/luray_snp/B02.tif 55256722095

You can also pass a positive integer to the parameter to specify the number of desired partitions.

df = spark.read.raster(uri, spatial_index_partitions=800)

GeoTrellis

GeoTrellis Catalogs

GeoTrellis is one of the key libraries upon which RasterFrames is built. It provides a Scala language API for working with geospatial raster data. GeoTrellis defines a tile layer storage format for persisting imagery mosaics. RasterFrames provides a DataSource supporting both reading and writing GeoTrellis layers.

A GeoTrellis catalog is a set of GeoTrellis layers. We can read a DataFrame giving details of the content of a catalog using the syntax below. The scheme is typically hdfs or file, or a cloud storage provider like s3.

gt_cat = spark.read.geotrellis_catalog('scheme://path-to-gt-catalog')

GeoTrellis Layers

The catalog will give details on the individual layers available for query. We can read each layer with the URI to the catalog, the layer name, and the desired zoom level.

gt_layer = spark.read.geotrellis(path='scheme://path-to-gt-catalog', layer=layer_name, zoom=zoom_level)

This will return a RasterFrame with additional GeoTrellis-specific metadata, inherited from GeoTrellis, stored as JSON in the metadata of the tile column.