Configurations

td-spark.conf

To use td-spark library in your Spark applications, create td-spark.conf file and pass it to --properties-file option of spark-submit (or spark-shell) command.

td-spark.conf

spark.td.site us
spark.td.apikey (your TD API key)
# Using KryoSerializer is recommended for faster performance
spark.serializer org.apache.spark.serializer.KryoSerializer
  • spark.td.site can be one of us, jp, eu01, ap02 based on your TD account region.

  • spark.td.apikey is your TD API key. To learn how to get TD API keys, see also: Getting Your API Keys.

To configure JVM memory size for Spark, etc., add additional configurations to td-spark.conf file:

# default 1g
spark.driver.memory 2g
# default 1g
spark.executor.memory 2g
# (optional) If you see an error around Kryo serializer
spark.kryoserializer.buffer.max 1g

See also the list of the available properties in Spark.

Using Spark Shell

To start using Spark Shell, a REPL interface for Scala and Spark, two options are available: downloading Spark or using Docker:

Downloading Spark

  • Spark itself

    • Download a pre-compiled package of Spark from the download page

  • td-spark-assembly.jar file

To launch Spark shell, specify the paths to your td-spark-assembly.jar and td-spark.conf file:

$ ./bin/spark-shell --jars (path to td-spark-assembly.jar) --properties-file (path to td-spark.conf)

Docker

If you have a docker installed in your machine, you can quickly start using td-spark without manually downloading Spark nor td-spark library. You can launch the td-spark shell by setting your TD API key as TD_API_KEY environment variable:

$ export TD_API_KEY=(your TD API key)
$ docker run -it -e TD_API_KEY=$TD_API_KEY devtd/td-spark-shell:latest_spark3.1.1

This image will automatically import the package for td-spark and set td variable as described below.

td: TDSparkContext

To use the functionality of td-spark, import com.treasuredata.spark._ and create a TDSparkContext (td) from the current Spark session (spark variable):

scala> import com.treasuredata.spark._; val td = spark.td

This td variable (TDSparkContext) will be an entry point to access Treasure Data.

Now your are ready to read tables in TD with td.table("(db name).(table name)"):

// Read a table in TD as a Spark DataFrame
scala> val df = td.table("sample_datasets.www_access").df
// Show the content of the DataFrame
scala> df.show

Reading Tables

within: Specifying Time Ranges of Tables

Treasure Data is a time-series database and your data are growing every day. To effectively find a target portion of your data, specifying a time range of the table is important. To do so, you can use td.table("table name") and within filtering method. After applying some filtering, you can create a DataFrame with .df method.

For example, if you want read the data collected last day, use .within("-1d") to narrow down the time range to scan:

val df = td.table("(database).(table)").within("-1d").df

This within(...) syntax supports the same interval string representations with TD_INTERVAL.

Here are several examples to specify time ranges of a table:

val t = td.table("sample_datasets.www_access")

// Using duration strings as in TD_INTERVAL
t.within("-1d") // last day
t.within("-1m/2014-10-03 09:13:00") // last 1 minute from a given offset
t.within("-1d", java.time.ZoneId.of("Asia/Tokyo")) // last day in JST tiemzone

// Specifying unix time ranges
// [from, unti) unix time range
t.withinUnixTimeRange(from = 1412320845, until = 1412321000)

t.fromUnixTime(1412320845) // [from, ...)
t.untilUnixTime(1412321000) // [... , until)

// Using time strings yyyy-MM-dd HH:mm:ss (timezone. default = UTC)?
t.fromTime("2014-10-03 09:12:00") // [from, ...)
t.untilTime("2014-10-03 09:13:00") // [..., until)

 // [from, until)
t.withinTimeRange(from = "2014-10-03 09:12:00", until = "2014-10-03 09:13:00")
 // [from, until) in Asia/Tokyo timezone
t.withinTimeRange(from = "2014-10-03 09:12:00", until = "2014-10-03 09:13:00", ZoneId.of("Asia/Tokyo"))

// Specifying timezone
// [from, ...) in Asia/Tokyo timezone
t.fromTime("2014-10-03 09:12:00", ZoneId.of("Asia/Tokyo"))
 // [..., until) in Asia/Tokyo timezone
t.untilTime("2014-10-03 09:13:00", ZoneId.of("Asia/Tokyo"))

The following date time formats are supported in from and to arguments of withinTimeRange:

  • yyyy-MM-dd HH:mm:ss zzz

    • e.g., 2017-01-01 00:00:00 UTC, 2017-01-01 00:00:00 America/Los_Angeles, 2017-01-01 00:00:00Z, etc.

  • yyyy-MM-dd

    • e.g., 2016-12-01. UTC timezone will be used

  • unix time

    • e.g., 1412327520. Unix time is the number of seconds since 1970-01-01 00:00:00Z

Note: An access token for reading partition data expires in an hour by default, so if you have a long-running Spark job, you may receive the following error message:

The access token is expired: ... You may need to set a longer expiration time with TDTable.withExpirationSec(seconds) for td-spark.

To extend the expiration period longer than 3600 seconds (= 1 hour), use withExpirationSec:

val t = td.table("sample_datasets.www_access").withExpirationSec(7200)

DataFrames

After reading table as DataFrame, you can use Spark’s functionality for manipulating table data. See also Spark SQL: Getting Started for more detailed usages of DataFrame.

scala> val df = td.table("sample_datasets.www_access").df
df: org.apache.spark.sql.DataFrame = [user: bigint, host: string, path: string, referer: string, code: bigint, agent: string, size: bigint, method: string, time: bigint]

scala> df.show
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null| 128.201.191.32|    /category/office|  /item/finance/4164| 200|Mozilla/5.0 (Wind...|  42|   GET|1412351990|
|null| 40.129.109.180| /item/software/2265|                   -| 200|Mozilla/5.0 (Maci...|  48|   GET|1412351976|
|null|  52.120.217.35|      /category/toys|                   -| 200|Mozilla/5.0 (Wind...| 127|   GET|1412351961|
|null|  52.36.173.175|/item/electronics...|http://www.google...| 200|Mozilla/5.0 (comp...| 135|   GET|1412351947|
|null|   64.51.37.197|  /category/software|/category/electro...| 200|Mozilla/5.0 (comp...|  62|   GET|1412351932|
|null| 184.90.157.127|/item/electronics...|/search/?c=Electr...| 200|Mozilla/4.0 (comp...| 114|   GET|1412351918|
|null| 172.207.112.61|   /category/jewelry|/category/electro...| 200|Mozilla/5.0 (Wind...| 105|   GET|1412351904|
|null| 168.186.74.157|/category/books?f...|  /item/software/917| 200|Mozilla/5.0 (comp...| 110|   GET|1412351889|
|null|  132.111.61.48|/category/office?...|                   -| 200|Mozilla/5.0 (Wind...| 135|   GET|1412351875|
|null|   32.60.103.31|     /category/books|/category/electro...| 200|Mozilla/5.0 (Wind...| 113|   GET|1412351861|
|null|  192.78.111.80|   /item/office/4848|                   -| 200|Mozilla/4.0 (comp...|  70|   GET|1412351846|
|null|  40.195.100.76|/category/electro...|                   -| 200|Mozilla/5.0 (comp...|  41|   GET|1412351832|
|null|  64.30.110.187|   /category/jewelry|                   -| 200|Mozilla/5.0 (Wind...|  85|   GET|1412351817|
|null|128.147.132.146|   /category/jewelry|/category/electro...| 200|Mozilla/5.0 (comp...|  97|   GET|1412351803|
|null|    40.90.71.68| /item/software/4079|  /category/software| 200|Mozilla/5.0 (Wind...| 114|   GET|1412351789|
|null|   72.81.86.154|/category/electro...|http://www.google...| 200|Mozilla/5.0 (Wind...| 129|   GET|1412351774|
|null| 156.42.197.129|     /category/books|                   -| 200|Mozilla/5.0 (Wind...| 133|   GET|1412351760|
|null|  176.36.215.78|/category/electro...|                   -| 200|Mozilla/5.0 (Wind...|  84|   GET|1412351746|
|null|204.117.145.144|   /item/cameras/940|/category/electro...| 200|Mozilla/5.0 (Wind...| 126|   GET|1412351731|
|null|  180.96.64.110|/item/electronics...|     /category/games| 200|Mozilla/4.0 (comp...| 112|   GET|1412351717|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 20 rows


scala> df.groupBy("method").count().show
+------+-----+
|method|count|
+------+-----+
|   GET| 4624|
|  POST|  376|
+------+-----+


scala> df.filter(df("method") === "POST").show
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null|  156.93.73.140|/search/?c=Electr...|                   -| 200|Mozilla/4.0 (comp...|  79|  POST|1412351010|
|null| 204.180.29.118|/search/?c=Electr...|                   -| 200|Mozilla/4.0 (comp...|  69|  POST|1412350836|
|null|180.159.172.114|    /search/?c=Books|                   -| 200|Mozilla/5.0 (comp...|  50|  POST|1412350517|
|null|  96.24.207.218|     /search/?c=Toys|                   -| 200|Mozilla/5.0 (comp...| 103|  POST|1412350285|
|null| 172.159.143.86|   /search/?c=Health|                   -| 200|Mozilla/5.0 (comp...|  86|  POST|1412350212|
|null|124.192.113.194|/search/?c=Books+...| /search/?c=Software| 200|Mozilla/5.0 (Maci...|  72|  POST|1412350140|
|null|164.147.150.199|/search/?c=Electr...|/item/electronics...| 200|Mozilla/5.0 (Maci...|  75|  POST|1412350052|
|null|  68.108.188.24|/search/?c=Networ...|                   -| 200|Mozilla/5.0 (Maci...|  77|  POST|1412350023|
|null|  204.210.70.32|/search/?c=Books+...|                   -| 200|Mozilla/5.0 (Wind...| 136|  POST|1412350009|
|null|  80.30.170.153| /search/?c=Software|                   -| 200|Mozilla/5.0 (Wind...| 139|  POST|1412349980|
|null|  212.33.136.97|/search/?c=Jewelr...|  /category/software| 200|Mozilla/4.0 (comp...|  49|  POST|1412349746|
|null|  224.165.78.90|/search/?c=Garden...|/search/?c=Books+...| 200|Mozilla/5.0 (Wind...| 100|  POST|1412349294|
|null| 136.84.211.209|/search/?c=Electr...|  /item/finance/3680| 200|Mozilla/5.0 (Wind...| 137|  POST|1412349162|
|null|   100.93.27.59|/search/?c=Electr...|   /category/cameras| 200|Mozilla/5.0 (comp...| 133|  POST|1412348898|
|null| 220.216.212.78|/search/?c=Computers|                   -| 200|Mozilla/5.0 (Wind...|  42|  POST|1412348796|
|null|100.177.172.122|/search/?c=Games+...|                   -| 200|Mozilla/5.0 (Wind...|  41|  POST|1412348766|
|null| 20.102.185.162|/search/?c=Electr...|                   -| 200|Mozilla/5.0 (Wind...| 138|  POST|1412348737|
|null| 132.54.226.209|/search/?c=Electr...|  /category/software| 200|Mozilla/4.0 (comp...|  81|  POST|1412348590|
|null| 156.228.103.39|    /search/?c=Music|    /item/books/2102| 200|Mozilla/5.0 (Wind...|  46|  POST|1412348561|
|null|    24.84.40.51|     /search/?c=Toys|                   -| 200|Mozilla/5.0 (Wind...|  90|  POST|1412348531|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 20 rows

Querying Tables

Presto

You can also read Presto query results as DataFrames.

  • td.presto(sql) for streaming Presto query results to Spark

  • td.prestoJob(sql) For running queries with large results. This will support internal query retry (e.g., network connection failure)

  • td.executePresto(sql) For running non-query Presto statements, e.g., CREATE TABLE, DROP TABLE, etc.

scala> val df = td.presto("select * from www_access limit 10")
df: org.apache.spark.sql.DataFrame = [user: string, host: string, path: string, referer: string, code: bigint, agent: string, size: bigint, method: string, time: bigint]

scala> df.show
2016-07-11 16:47:20-0700  info [TDJobRelation]
Submitted presto query:
select * from www_access limit 10
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null|  76.45.175.151|   /item/sports/4642|/search/?c=Sports...| 200|Mozilla/5.0 (Maci...| 137|   GET|1412380793|
|null|184.162.105.153|   /category/finance|                   -| 200|Mozilla/4.0 (comp...|  68|   GET|1412380784|
|null|  144.30.45.112|/item/electronics...| /item/software/4777| 200|Mozilla/5.0 (Maci...| 136|   GET|1412380775|
|null|  68.42.225.106|/category/networking|/category/electro...| 200|Mozilla/4.0 (comp...|  98|   GET|1412380766|
|null| 104.66.194.210|     /category/books|                   -| 200|Mozilla/4.0 (comp...|  43|   GET|1412380757|
|null|    64.99.74.69|  /item/finance/3775|/category/electro...| 200|Mozilla/5.0 (Wind...|  86|   GET|1412380748|
|null| 136.135.51.168|/item/networking/540|                   -| 200|Mozilla/5.0 (Wind...|  89|   GET|1412380739|
|null|   52.99.134.55|   /item/health/1326|/category/electro...| 200|Mozilla/5.0 (Maci...|  51|   GET|1412380730|
|null|  136.51.116.68|/category/finance...|                   -| 200|Mozilla/5.0 (comp...|  99|   GET|1412380721|
|null|136.141.218.177| /item/computers/959|                   -| 200|Mozilla/5.0 (Wind...| 124|   GET|1412380712|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+

Hive

For running queries with Hive, use td.hiveJob(sql):

scala> val j = td.hiveJob("select 1")
2019-11-22 19:53:19.296Z  info [TDSparkContext]  - (TDSparkContext.scala:248)
Submitted job 622075532:
select 1
j: com.treasuredata.spark.TDJobRelation = TDJobRelation(TDSparkContext(local-1574451364721,spark.td.applicationId -> local-1574451364721, spark.td.local.dir -> /tmp),622075532,hive)

scala> j.jobId
res7: String = 622075532

scala> j.df.show
+---+
|_c0|
+---+
|  1|
+---+

Two-Step Job Execution

If you want to submit a long-running query and read the result later, use td.prestoJob(sql) (for Presto) or td.hiveJob(sql) (for Hive), then read the job result by specifying the job_id with td.jobResult(jobId):

# Submit a query
scala> val job = td.prestoJob("select 10")
2017-09-05 13:02:06-0700  info [TDSparkContext]  - (TDSparkContext.scala:78)
Submitted job 171612415:
select 10
job: com.treasuredata.spark.TDJobRelation = TDJobRelation(TDSparkContext(local-1504641337677,spark.td.plazma_api.host -> api-plazma.treasuredata.com, spark.td.api.host -> api.treasuredata.com),171612415)

## This method will block until the job completes
scala> val result = td.jobResult(job.jobId)
result: org.apache.spark.sql.DataFrame = [_col0: bigint]

scala> result.show
2017-09-05 13:02:58-0700  info [TDJobRelation] Reading job 171612415 result - (TDJobRelation.scala:132)
+-----+
|_col0|
+-----+
|   10|
+-----+

Spark SQL

It is also possible to process SQL query totally on your Spark cluster side by mapping tables in TD as TempViews in Spark. Note that SparkSQL is generally slower than using Presto as it will download the source table data to your Spark cluster before processing the query in SparkSQL.

# Register DataFrame as a temporary table
scala> td.table("hb_tiny.rankings").df.createOrReplaceTempView("rankings")

scala> val q1 = spark.sql("select page_url, page_rank from rankings where page_rank > 100")
q1: org.apache.spark.sql.DataFrame = [page_url: string, page_rank: bigint]

scala> q1.show
2021-05-10 14:15:58.130Z  info [PartitionScanner] Fetching the partition list of hb_tiny.rankings (Full Scan) - (PartitionScanner.scala:29)
2021-05-10 14:15:59.457Z  info [PartitionScanner] Retrieved 2 partition entries - (PartitionScanner.scala:36)
+--------------------+---------+
|            page_url|page_rank|
+--------------------+---------+
|xjhmjsuqolfklbvxn...|      251|
|seozvzwkcfgnfuzfd...|      165|
|fdgvmwbrjlmvuoquy...|      132|
|gqghyyardomubrfsv...|      108|
|qtqntqkvqioouwfuj...|      278|
|wrwgqnhxviqnaacnc...|      135|
|  cxdmunpixtrqnvglnt|      146|
| ixgiosdefdnhrzqomnf|      126|
|xybwfjcuhauxiopfi...|      112|
|ecfuzdmqkvqktydvi...|      237|
|dagtwwybivyiuxmkh...|      177|
|emucailxlqlqazqru...|      134|
|nzaxnvjaqxapdjnzb...|      119|
|       ffygkvsklpmup|      332|
|hnapejzsgqrzxdswz...|      171|
|rvbyrwhzgfqvzqkus...|      148|
|knwlhzmcyolhaccqr...|      104|
|nbizrgdziebsaecse...|      665|
|jakofwkgdcxmaaqph...|      187|
|kvhuvcjzcudugtidf...|      120|
+--------------------+---------+
only showing top 20 rows

Writing Tables

Uploading DataFrames to TD

To upload a DataFrame to TD, use df.createOrReplaceTD, df.insertIntoTD:

// This import statment is necessary to support createOrReplaceTD and insertIntoTD for your own DataFrames
import com.treasuredata.spark._

val df: DataFrame = ... // Prepare some DataFrame

// Writes DataFrame to a new TD table. If the table already exists, this will fail.
df.write.td("(database name).(table name)")

// Create or replace the target TD table with the contents of DataFrame
df.createOrReplaceTD("(database name).(table name)")

// Append the contents of DataFrame to the target TD table.
// If the table doesn't exists, it will create a new table
df.insertIntoTD("(database name).(table name)")

Using Save Mode of DataFrames

You can also use the full DataFrame.write and save syntax. Specify the format as com.treasuredata.spark, then select the target table with .option("table", "(target database).(table name)"):

df.write // df is an arbitrary DataFrame
  .mode("overwrite") // append, overwrite, error, ignore
  .format("com.treasuredata.spark")
  .option("table", "(database name).(table name)") // Specify an upload taget table
  .save

Available write modes (SaveMode)

  • append: Append to the target table. Throws an error if the table doesn’t exist.

  • overwrite: This performs two-step update. First it deletes the target table if it exists, then creates a new table with the new data.

  • error: (default) If the target table already exists, throws an exception.

  • ignore: If the target table exists, ignores the save operation.

Writing Data Using Spark SQL

Writing tables inside Spark SQL is supported. This is an example of creating a new table with CREATE TABLE AS (SELECT ...):

spark.sql("""
CREATE TABLE my_table
USING com.treasuredata.spark
OPTIONS(table '(database name).(table name)')
AS SELECT ...
""")

Creating New Databases and Tables

The following functions are supported for creating/dropping tables:

td.table(...).exists
td.table(...).dropIfExists
td.table(...).createIfNotExists
td.database(...).exists
td.database(...).dropIfExists
td.database(...).createIfNotExists

Creating New UDP Tables

When you know using an exact match to a column (e.g., id values) in the table is necessary, using UDP (User-Defined Partitioning). You can create UDP tables with these methods:

td.createLongPartitionedTable("table_name", "(long type column name)")
td.createStringPartitionedTable("table_name", "(string type column name)")

Swapping Table Contents

This feature is available since td-spark 20.5.0.

You can replace the contents of two tables. The input tables must be in the same database:

// Swap the contents of two tables
td.swapTables("db1.table1", "db1.table2")

// Another way to swap tables
td.table("db1.table1").swapTableWith("table2")

Advanced Usage

Using multiple TD accounts

To access data in a diffeerent TD account from the one specified in td-spark.conf, use td.withApiKey(apikey):

// Returns a new TDSparkContext with the specified key
val td2 = td.withApiKey("key2")

For reading tables with the new key, use td2:

// Read a table with key2
val df = td2.table("sample_datasets.www_access").df

You can also specify a new key in the data source options:

df.write
  .mode("overwrite")
  .format("com.treasuredata.spark")
  .option("table", "mydb.tbl1")
  .option("apikey", "key2") // Specify a new api key
  .save