Exporting RasterFrames

While the goal of RasterFrames is to make it as easy as possible to do your geospatial analysis with a single construct, it is helpful to be able to transform it into other representations for various use cases.

Converting to Array

The cell values within a Tile are encoded internally as an array. There may be use cases where the additional context provided by the Tile construct is no longer needed and one would prefer to work with the underlying array data.

The tileToArray column function requires a type parameter to indicate the array element type you would like used. The following types may be used: Int, Double, Byte, Short, Float

scala> val withArrays = rf.withColumn("tileData", tileToArray[Short]($"tile")).drop("tile")
withArrays: org.apache.spark.sql.DataFrame = [spatial_key: struct<col: int, row: int>, tileData: array<smallint>]

scala> withArrays.show(5, 40)
+-----------+----------------------------------------+
|spatial_key|                                tileData|
+-----------+----------------------------------------+
|      [6,3]|[11576, 12488, 9070, 9614, 10561, 112...|
|      [4,0]|[10155, 9839, 9296, 9495, 9921, 9962,...|
|      [0,0]|[14294, 14277, 13939, 13604, 14182, 1...|
|      [4,2]|[10844, 10802, 11270, 10893, 8469, 77...|
|      [0,2]|[8896, 9062, 9143, 9285, 9345, 9341, ...|
+-----------+----------------------------------------+
only showing top 5 rows

You can convert the data back to an array, but you have to specify the target tile dimensions.

scala> val tileBack = withArrays.withColumn("tileAgain", arrayToTile($"tileData", 128, 128))
tileBack: org.apache.spark.sql.DataFrame = [spatial_key: struct<col: int, row: int>, tileData: array<smallint> ... 1 more field]

scala> tileBack.drop("tileData").show(5, 40)
+-----------+--------------------------------------+
|spatial_key|                             tileAgain|
+-----------+--------------------------------------+
|      [6,3]|ShortRawArrayTile([S@14922268,128,128)|
|      [4,0]| ShortRawArrayTile([S@663b31a,128,128)|
|      [0,0]| ShortRawArrayTile([S@69b96cd,128,128)|
|      [4,2]|ShortRawArrayTile([S@5e4b9f22,128,128)|
|      [0,2]|ShortRawArrayTile([S@5b4f77e1,128,128)|
+-----------+--------------------------------------+
only showing top 5 rows

Note that the created tile will not have a NoData value associated with it. Here’s how you can do that:

scala> val tileBackAgain = withArrays.withColumn("tileAgain", withNoData(arrayToTile($"tileData", 128, 128), 3))
tileBackAgain: org.apache.spark.sql.DataFrame = [spatial_key: struct<col: int, row: int>, tileData: array<smallint> ... 1 more field]

scala> tileBackAgain.drop("tileData").show(5, 50)
+-----------+--------------------------------------------------+
|spatial_key|                                         tileAgain|
+-----------+--------------------------------------------------+
|      [6,3]|ShortUserDefinedNoDataArrayTile([S@43aa4e0d,128...|
|      [4,0]|ShortUserDefinedNoDataArrayTile([S@54a30614,128...|
|      [0,0]|ShortUserDefinedNoDataArrayTile([S@166913c2,128...|
|      [4,2]|ShortUserDefinedNoDataArrayTile([S@301b21f,128,...|
|      [0,2]|ShortUserDefinedNoDataArrayTile([S@124a5e54,128...|
+-----------+--------------------------------------------------+
only showing top 5 rows

Writing to Parquet

It is often useful to write Spark results in a form that is easily reloaded for subsequent analysis. The Parquetcolumnar storage format, native to Spark, is ideal for this. RasterFrames work just like any other DataFrame in this scenario as long as rfInit is called to register the imagery types.

Let’s assume we have a RasterFrame we’ve done some fancy processing on:

import geotrellis.raster.equalization._
val equalizer = udf((t: Tile) => t.equalize())
val equalized = rf.withColumn("equalized", equalizer($"tile")).asRF
scala> equalized.printSchema
root
 |-- spatial_key: struct (nullable = true)
 |    |-- col: integer (nullable = false)
 |    |-- row: integer (nullable = false)
 |-- tile: rf_tile (nullable = true)
 |-- equalized: rf_tile (nullable = true)


scala> equalized.select(aggStats($"tile")).show(false)
+---------+------+-------+-----------------+------------------+
|dataCells|min   |max    |mean             |variance          |
+---------+------+-------+-----------------+------------------+
|387000   |7209.0|39217.0|10160.48549870801|3315238.5311127007|
+---------+------+-------+-----------------+------------------+


scala> equalized.select(aggStats($"equalized")).show(false)
+---------+---+-------+------------------+-------------------+
|dataCells|min|max    |mean              |variance           |
+---------+---+-------+------------------+-------------------+
|458724   |4.0|65535.0|32763.474431248418|3.025128587936561E8|
+---------+---+-------+------------------+-------------------+

We write it out just like any other DataFrame, including the ability to specify partitioning:

val filePath = "/tmp/equalized.parquet"
equalized.select("*", "spatial_key.*").write.partitionBy("col", "row").mode(SaveMode.Overwrite).parquet(filePath)

Let’s confirm partitioning happened as expected:

scala> import java.io.File
import java.io.File

scala> new File(filePath).list.filter(f => !f.contains("_"))
res8: Array[String] = Array(col=0, col=1, col=6, col=3, col=4, col=5, col=2)

Now we can load the data back in and check it out:

val rf2 = spark.read.parquet(filePath)
scala> rf2.printSchema
root
 |-- spatial_key: struct (nullable = true)
 |    |-- col: integer (nullable = true)
 |    |-- row: integer (nullable = true)
 |-- tile: rf_tile (nullable = true)
 |-- equalized: rf_tile (nullable = true)
 |-- col: integer (nullable = true)
 |-- row: integer (nullable = true)


scala> equalized.select(aggStats($"tile")).show(false)
+---------+------+-------+-----------------+------------------+
|dataCells|min   |max    |mean             |variance          |
+---------+------+-------+-----------------+------------------+
|387000   |7209.0|39217.0|10160.48549870801|3315238.5311127007|
+---------+------+-------+-----------------+------------------+


scala> equalized.select(aggStats($"equalized")).show(false)
+---------+---+-------+------------------+-------------------+
|dataCells|min|max    |mean              |variance           |
+---------+---+-------+------------------+-------------------+
|458724   |4.0|65535.0|32763.474431248418|3.025128587936561E8|
+---------+---+-------+------------------+-------------------+

Exporting a Raster

For the purposes of debugging, the RasterFrame tiles can be reassembled back into a raster for viewing. However, keep in mind that this will download all the data to the driver, and reassemble it in-memory. So it’s not appropriate for very large coverages.

Here’s how one might render the image to a georeferenced GeoTIFF file:

import geotrellis.raster.io.geotiff.GeoTiff
val image = equalized.toRaster($"equalized", 774, 500)
GeoTiff(image).write("target/scala-2.11/tut/rf-raster.tiff")

Download GeoTIFF

Here’s how one might render a raster frame to a false color PNG file:

val colors = ColorMap.fromQuantileBreaks(image.tile.histogram, ColorRamps.BlueToOrange)
image.tile.color(colors).renderPng().write("target/scala-2.11/tut/rf-raster.png")

Exporting to a GeoTrellis Layer

For future analysis it is helpful to persist a RasterFrame as a GeoTrellis layer.

First, convert the RasterFrame into a TileLayerRDD. The return type is an Either; the left side is for spatial-only keyed data

val tlRDD = equalized.toTileLayerRDD($"equalized").left.get
// tlRDD: geotrellis.spark.TileLayerRDD[geotrellis.spark.SpatialKey] = ContextRDD[70] at RDD at ContextRDD.scala:35

Then create a GeoTrellis layer writer:

import java.nio.file.Files
import spray.json._
import DefaultJsonProtocol._
import geotrellis.spark.io._
val p = Files.createTempDirectory("gt-store")
val writer: LayerWriter[LayerId] = LayerWriter(p.toUri)

val layerId = LayerId("equalized", 0)
writer.write(layerId, tlRDD, index.ZCurveKeyIndexMethod)

Take a look at the metadata in JSON format:

scala> AttributeStore(p.toUri).readMetadata[JsValue](layerId).prettyPrint
res15: String =
{
  "extent": {
    "xmin": 431902.5,
    "ymin": 4313647.5,
    "xmax": 443512.5,
    "ymax": 4321147.5
  },
  "layoutDefinition": {
    "extent": {
      "xmin": 431902.5,
      "ymin": 4313467.5,
      "xmax": 445342.5,
      "ymax": 4321147.5
    },
    "tileLayout": {
      "layoutCols": 7,
      "layoutRows": 4,
      "tileCols": 128,
      "tileRows": 128
    }
  },
  "bounds": {
    "minKey": {
      "col": 0,
      "row": 0
    },
    "maxKey": {
      "col": 6,
      "row": 3
    }
  },
  "cellType": "uint16",
  "crs": "+proj=utm +zone=16 +datum=WGS84 +units=m +no_defs "
}

Converting to RDD and TileLayerRDD

Since a RasterFrame is just a DataFrame with extra metadata, the method DataFrame.rdd is available for simple conversion back to RDD space. The type returned by .rdd is dependent upon how you select it.

scala> import scala.reflect.runtime.universe._
import scala.reflect.runtime.universe._

scala> def showType[T: TypeTag](t: T) = println(implicitly[TypeTag[T]].tpe.toString)
showType: [T](t: T)(implicit evidence$1: reflect.runtime.universe.TypeTag[T])Unit

scala> showType(rf.rdd)
org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

scala> showType(rf.select(rf.spatialKeyColumn, $"tile".as[Tile]).rdd) 
org.apache.spark.rdd.RDD[(geotrellis.spark.SpatialKey, geotrellis.raster.Tile)]

scala> showType(rf.select(rf.spatialKeyColumn, $"tile").as[(SpatialKey, Tile)].rdd) 
org.apache.spark.rdd.RDD[(geotrellis.spark.SpatialKey, geotrellis.raster.Tile)]

If your goal convert a single tile column with its spatial key back to a TileLayerRDD[K], then there’s an additional extension method on RasterFrame called toTileLayerRDD, which preserves the tile layer metadata, enhancing interoperation with GeoTrellis RDD extension methods.

scala> showType(rf.toTileLayerRDD($"tile".as[Tile]))
scala.Either[geotrellis.spark.TileLayerRDD[geotrellis.spark.SpatialKey],geotrellis.spark.TileLayerRDD[geotrellis.spark.SpaceTimeKey]]