Configurations
td-spark.conf
To use td-spark library in your Spark applications, create td-spark.conf
file and pass it to --properties-file
option of spark-submit
(or spark-shell
) command.
td-spark.conf
spark.td.site us
spark.td.apikey (your TD API key)
# Using KryoSerializer is recommended for faster performance
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.td.site
can be one ofus
,jp
,eu01
,ap02
based on your TD account region.spark.td.apikey
is your TD API key. To learn how to get TD API keys, see also: Getting Your API Keys.
To configure JVM memory size for Spark, etc., add additional configurations to td-spark.conf
file:
# default 1g
spark.driver.memory 2g
# default 1g
spark.executor.memory 2g
# (optional) If you see an error around Kryo serializer
spark.kryoserializer.buffer.max 1g
See also the list of the available properties in Spark.
Using Spark Shell
To start using Spark Shell, a REPL interface for Scala and Spark, two options are available: downloading Spark or using Docker:
Downloading Spark
Spark itself
Download a pre-compiled package of Spark from the download page
td-spark-assembly.jar file
Download it from Download
To launch Spark shell, specify the paths to your td-spark-assembly.jar and td-spark.conf file:
$ ./bin/spark-shell --jars (path to td-spark-assembly.jar) --properties-file (path to td-spark.conf)
Docker
If you have a docker installed in your machine, you can quickly start using td-spark without manually downloading Spark nor td-spark library. You can launch the td-spark shell by setting your TD API key as TD_API_KEY
environment variable:
$ export TD_API_KEY=(your TD API key)
$ docker run -it -e TD_API_KEY=$TD_API_KEY devtd/td-spark-shell:latest_spark3.1.1
This image will automatically import the package for td-spark and set td
variable as described below.
td: TDSparkContext
To use the functionality of td-spark, import com.treasuredata.spark._
and create a TDSparkContext
(td) from the current Spark session (spark variable):
scala> import com.treasuredata.spark._; val td = spark.td
This td variable (TDSparkContext) will be an entry point to access Treasure Data.
Now your are ready to read tables in TD with td.table("(db name).(table name)")
:
// Read a table in TD as a Spark DataFrame
scala> val df = td.table("sample_datasets.www_access").df
// Show the content of the DataFrame
scala> df.show
Reading Tables
within: Specifying Time Ranges of Tables
Treasure Data is a time-series database and your data are growing every day. To effectively find a target portion of your data, specifying a time range of the table is important. To do so, you can use td.table("table name")
and within
filtering method. After applying some filtering, you can create a DataFrame with .df
method.
For example, if you want read the data collected last day, use .within("-1d")
to narrow down the time range to scan:
val df = td.table("(database).(table)").within("-1d").df
This within(...)
syntax supports the same interval string representations with TD_INTERVAL.
Here are several examples to specify time ranges of a table:
val t = td.table("sample_datasets.www_access")
// Using duration strings as in TD_INTERVAL
t.within("-1d") // last day
t.within("-1m/2014-10-03 09:13:00") // last 1 minute from a given offset
t.within("-1d", java.time.ZoneId.of("Asia/Tokyo")) // last day in JST tiemzone
// Specifying unix time ranges
// [from, unti) unix time range
t.withinUnixTimeRange(from = 1412320845, until = 1412321000)
t.fromUnixTime(1412320845) // [from, ...)
t.untilUnixTime(1412321000) // [... , until)
// Using time strings yyyy-MM-dd HH:mm:ss (timezone. default = UTC)?
t.fromTime("2014-10-03 09:12:00") // [from, ...)
t.untilTime("2014-10-03 09:13:00") // [..., until)
// [from, until)
t.withinTimeRange(from = "2014-10-03 09:12:00", until = "2014-10-03 09:13:00")
// [from, until) in Asia/Tokyo timezone
t.withinTimeRange(from = "2014-10-03 09:12:00", until = "2014-10-03 09:13:00", ZoneId.of("Asia/Tokyo"))
// Specifying timezone
// [from, ...) in Asia/Tokyo timezone
t.fromTime("2014-10-03 09:12:00", ZoneId.of("Asia/Tokyo"))
// [..., until) in Asia/Tokyo timezone
t.untilTime("2014-10-03 09:13:00", ZoneId.of("Asia/Tokyo"))
The following date time formats are supported in from and to arguments of withinTimeRange:
yyyy-MM-dd HH:mm:ss zzz
e.g., 2017-01-01 00:00:00 UTC, 2017-01-01 00:00:00 America/Los_Angeles, 2017-01-01 00:00:00Z, etc.
yyyy-MM-dd
e.g., 2016-12-01. UTC timezone will be used
unix time
e.g., 1412327520. Unix time is the number of seconds since 1970-01-01 00:00:00Z
Note: An access token for reading partition data expires in an hour by default, so if you have a long-running Spark job, you may receive the following error message:
The access token is expired: ... You may need to set a longer expiration time with TDTable.withExpirationSec(seconds) for td-spark.
To extend the expiration period longer than 3600 seconds (= 1 hour), use withExpirationSec
:
val t = td.table("sample_datasets.www_access").withExpirationSec(7200)
DataFrames
After reading table as DataFrame, you can use Spark’s functionality for manipulating table data. See also Spark SQL: Getting Started for more detailed usages of DataFrame.
scala> val df = td.table("sample_datasets.www_access").df
df: org.apache.spark.sql.DataFrame = [user: bigint, host: string, path: string, referer: string, code: bigint, agent: string, size: bigint, method: string, time: bigint]
scala> df.show
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user| host| path| referer|code| agent|size|method| time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null| 128.201.191.32| /category/office| /item/finance/4164| 200|Mozilla/5.0 (Wind...| 42| GET|1412351990|
|null| 40.129.109.180| /item/software/2265| -| 200|Mozilla/5.0 (Maci...| 48| GET|1412351976|
|null| 52.120.217.35| /category/toys| -| 200|Mozilla/5.0 (Wind...| 127| GET|1412351961|
|null| 52.36.173.175|/item/electronics...|http://www.google...| 200|Mozilla/5.0 (comp...| 135| GET|1412351947|
|null| 64.51.37.197| /category/software|/category/electro...| 200|Mozilla/5.0 (comp...| 62| GET|1412351932|
|null| 184.90.157.127|/item/electronics...|/search/?c=Electr...| 200|Mozilla/4.0 (comp...| 114| GET|1412351918|
|null| 172.207.112.61| /category/jewelry|/category/electro...| 200|Mozilla/5.0 (Wind...| 105| GET|1412351904|
|null| 168.186.74.157|/category/books?f...| /item/software/917| 200|Mozilla/5.0 (comp...| 110| GET|1412351889|
|null| 132.111.61.48|/category/office?...| -| 200|Mozilla/5.0 (Wind...| 135| GET|1412351875|
|null| 32.60.103.31| /category/books|/category/electro...| 200|Mozilla/5.0 (Wind...| 113| GET|1412351861|
|null| 192.78.111.80| /item/office/4848| -| 200|Mozilla/4.0 (comp...| 70| GET|1412351846|
|null| 40.195.100.76|/category/electro...| -| 200|Mozilla/5.0 (comp...| 41| GET|1412351832|
|null| 64.30.110.187| /category/jewelry| -| 200|Mozilla/5.0 (Wind...| 85| GET|1412351817|
|null|128.147.132.146| /category/jewelry|/category/electro...| 200|Mozilla/5.0 (comp...| 97| GET|1412351803|
|null| 40.90.71.68| /item/software/4079| /category/software| 200|Mozilla/5.0 (Wind...| 114| GET|1412351789|
|null| 72.81.86.154|/category/electro...|http://www.google...| 200|Mozilla/5.0 (Wind...| 129| GET|1412351774|
|null| 156.42.197.129| /category/books| -| 200|Mozilla/5.0 (Wind...| 133| GET|1412351760|
|null| 176.36.215.78|/category/electro...| -| 200|Mozilla/5.0 (Wind...| 84| GET|1412351746|
|null|204.117.145.144| /item/cameras/940|/category/electro...| 200|Mozilla/5.0 (Wind...| 126| GET|1412351731|
|null| 180.96.64.110|/item/electronics...| /category/games| 200|Mozilla/4.0 (comp...| 112| GET|1412351717|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 20 rows
scala> df.groupBy("method").count().show
+------+-----+
|method|count|
+------+-----+
| GET| 4624|
| POST| 376|
+------+-----+
scala> df.filter(df("method") === "POST").show
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user| host| path| referer|code| agent|size|method| time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null| 156.93.73.140|/search/?c=Electr...| -| 200|Mozilla/4.0 (comp...| 79| POST|1412351010|
|null| 204.180.29.118|/search/?c=Electr...| -| 200|Mozilla/4.0 (comp...| 69| POST|1412350836|
|null|180.159.172.114| /search/?c=Books| -| 200|Mozilla/5.0 (comp...| 50| POST|1412350517|
|null| 96.24.207.218| /search/?c=Toys| -| 200|Mozilla/5.0 (comp...| 103| POST|1412350285|
|null| 172.159.143.86| /search/?c=Health| -| 200|Mozilla/5.0 (comp...| 86| POST|1412350212|
|null|124.192.113.194|/search/?c=Books+...| /search/?c=Software| 200|Mozilla/5.0 (Maci...| 72| POST|1412350140|
|null|164.147.150.199|/search/?c=Electr...|/item/electronics...| 200|Mozilla/5.0 (Maci...| 75| POST|1412350052|
|null| 68.108.188.24|/search/?c=Networ...| -| 200|Mozilla/5.0 (Maci...| 77| POST|1412350023|
|null| 204.210.70.32|/search/?c=Books+...| -| 200|Mozilla/5.0 (Wind...| 136| POST|1412350009|
|null| 80.30.170.153| /search/?c=Software| -| 200|Mozilla/5.0 (Wind...| 139| POST|1412349980|
|null| 212.33.136.97|/search/?c=Jewelr...| /category/software| 200|Mozilla/4.0 (comp...| 49| POST|1412349746|
|null| 224.165.78.90|/search/?c=Garden...|/search/?c=Books+...| 200|Mozilla/5.0 (Wind...| 100| POST|1412349294|
|null| 136.84.211.209|/search/?c=Electr...| /item/finance/3680| 200|Mozilla/5.0 (Wind...| 137| POST|1412349162|
|null| 100.93.27.59|/search/?c=Electr...| /category/cameras| 200|Mozilla/5.0 (comp...| 133| POST|1412348898|
|null| 220.216.212.78|/search/?c=Computers| -| 200|Mozilla/5.0 (Wind...| 42| POST|1412348796|
|null|100.177.172.122|/search/?c=Games+...| -| 200|Mozilla/5.0 (Wind...| 41| POST|1412348766|
|null| 20.102.185.162|/search/?c=Electr...| -| 200|Mozilla/5.0 (Wind...| 138| POST|1412348737|
|null| 132.54.226.209|/search/?c=Electr...| /category/software| 200|Mozilla/4.0 (comp...| 81| POST|1412348590|
|null| 156.228.103.39| /search/?c=Music| /item/books/2102| 200|Mozilla/5.0 (Wind...| 46| POST|1412348561|
|null| 24.84.40.51| /search/?c=Toys| -| 200|Mozilla/5.0 (Wind...| 90| POST|1412348531|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 20 rows
Querying Tables
Presto
You can also read Presto query results as DataFrames.
td.presto(sql)
for streaming Presto query results to Sparktd.prestoJob(sql)
For running queries with large results. This will support internal query retry (e.g., network connection failure)td.executePresto(sql)
For running non-query Presto statements, e.g., CREATE TABLE, DROP TABLE, etc.
scala> val df = td.presto("select * from www_access limit 10")
df: org.apache.spark.sql.DataFrame = [user: string, host: string, path: string, referer: string, code: bigint, agent: string, size: bigint, method: string, time: bigint]
scala> df.show
2016-07-11 16:47:20-0700 info [TDJobRelation]
Submitted presto query:
select * from www_access limit 10
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user| host| path| referer|code| agent|size|method| time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null| 76.45.175.151| /item/sports/4642|/search/?c=Sports...| 200|Mozilla/5.0 (Maci...| 137| GET|1412380793|
|null|184.162.105.153| /category/finance| -| 200|Mozilla/4.0 (comp...| 68| GET|1412380784|
|null| 144.30.45.112|/item/electronics...| /item/software/4777| 200|Mozilla/5.0 (Maci...| 136| GET|1412380775|
|null| 68.42.225.106|/category/networking|/category/electro...| 200|Mozilla/4.0 (comp...| 98| GET|1412380766|
|null| 104.66.194.210| /category/books| -| 200|Mozilla/4.0 (comp...| 43| GET|1412380757|
|null| 64.99.74.69| /item/finance/3775|/category/electro...| 200|Mozilla/5.0 (Wind...| 86| GET|1412380748|
|null| 136.135.51.168|/item/networking/540| -| 200|Mozilla/5.0 (Wind...| 89| GET|1412380739|
|null| 52.99.134.55| /item/health/1326|/category/electro...| 200|Mozilla/5.0 (Maci...| 51| GET|1412380730|
|null| 136.51.116.68|/category/finance...| -| 200|Mozilla/5.0 (comp...| 99| GET|1412380721|
|null|136.141.218.177| /item/computers/959| -| 200|Mozilla/5.0 (Wind...| 124| GET|1412380712|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
Hive
For running queries with Hive, use td.hiveJob(sql)
:
scala> val j = td.hiveJob("select 1")
2019-11-22 19:53:19.296Z info [TDSparkContext] - (TDSparkContext.scala:248)
Submitted job 622075532:
select 1
j: com.treasuredata.spark.TDJobRelation = TDJobRelation(TDSparkContext(local-1574451364721,spark.td.applicationId -> local-1574451364721, spark.td.local.dir -> /tmp),622075532,hive)
scala> j.jobId
res7: String = 622075532
scala> j.df.show
+---+
|_c0|
+---+
| 1|
+---+
Two-Step Job Execution
If you want to submit a long-running query and read the result later, use td.prestoJob(sql)
(for Presto) or td.hiveJob(sql)
(for Hive), then read the job result by specifying the job_id with td.jobResult(jobId)
:
# Submit a query
scala> val job = td.prestoJob("select 10")
2017-09-05 13:02:06-0700 info [TDSparkContext] - (TDSparkContext.scala:78)
Submitted job 171612415:
select 10
job: com.treasuredata.spark.TDJobRelation = TDJobRelation(TDSparkContext(local-1504641337677,spark.td.plazma_api.host -> api-plazma.treasuredata.com, spark.td.api.host -> api.treasuredata.com),171612415)
## This method will block until the job completes
scala> val result = td.jobResult(job.jobId)
result: org.apache.spark.sql.DataFrame = [_col0: bigint]
scala> result.show
2017-09-05 13:02:58-0700 info [TDJobRelation] Reading job 171612415 result - (TDJobRelation.scala:132)
+-----+
|_col0|
+-----+
| 10|
+-----+
Spark SQL
It is also possible to process SQL query totally on your Spark cluster side by mapping tables in TD as TempViews in Spark. Note that SparkSQL is generally slower than using Presto as it will download the source table data to your Spark cluster before processing the query in SparkSQL.
# Register DataFrame as a temporary table
scala> td.table("hb_tiny.rankings").df.createOrReplaceTempView("rankings")
scala> val q1 = spark.sql("select page_url, page_rank from rankings where page_rank > 100")
q1: org.apache.spark.sql.DataFrame = [page_url: string, page_rank: bigint]
scala> q1.show
2021-05-10 14:15:58.130Z info [PartitionScanner] Fetching the partition list of hb_tiny.rankings (Full Scan) - (PartitionScanner.scala:29)
2021-05-10 14:15:59.457Z info [PartitionScanner] Retrieved 2 partition entries - (PartitionScanner.scala:36)
+--------------------+---------+
| page_url|page_rank|
+--------------------+---------+
|xjhmjsuqolfklbvxn...| 251|
|seozvzwkcfgnfuzfd...| 165|
|fdgvmwbrjlmvuoquy...| 132|
|gqghyyardomubrfsv...| 108|
|qtqntqkvqioouwfuj...| 278|
|wrwgqnhxviqnaacnc...| 135|
| cxdmunpixtrqnvglnt| 146|
| ixgiosdefdnhrzqomnf| 126|
|xybwfjcuhauxiopfi...| 112|
|ecfuzdmqkvqktydvi...| 237|
|dagtwwybivyiuxmkh...| 177|
|emucailxlqlqazqru...| 134|
|nzaxnvjaqxapdjnzb...| 119|
| ffygkvsklpmup| 332|
|hnapejzsgqrzxdswz...| 171|
|rvbyrwhzgfqvzqkus...| 148|
|knwlhzmcyolhaccqr...| 104|
|nbizrgdziebsaecse...| 665|
|jakofwkgdcxmaaqph...| 187|
|kvhuvcjzcudugtidf...| 120|
+--------------------+---------+
only showing top 20 rows
Writing Tables
Uploading DataFrames to TD
To upload a DataFrame to TD, use df.createOrReplaceTD
, df.insertIntoTD
:
// This import statment is necessary to support createOrReplaceTD and insertIntoTD for your own DataFrames
import com.treasuredata.spark._
val df: DataFrame = ... // Prepare some DataFrame
// Writes DataFrame to a new TD table. If the table already exists, this will fail.
df.write.td("(database name).(table name)")
// Create or replace the target TD table with the contents of DataFrame
df.createOrReplaceTD("(database name).(table name)")
// Append the contents of DataFrame to the target TD table.
// If the table doesn't exists, it will create a new table
df.insertIntoTD("(database name).(table name)")
Using Save Mode of DataFrames
You can also use the full DataFrame.write
and save
syntax. Specify the format as com.treasuredata.spark
,
then select the target table with .option("table", "(target database).(table name)")
:
df.write // df is an arbitrary DataFrame
.mode("overwrite") // append, overwrite, error, ignore
.format("com.treasuredata.spark")
.option("table", "(database name).(table name)") // Specify an upload taget table
.save
Available write modes (SaveMode)
append: Append to the target table. Throws an error if the table doesn’t exist.
overwrite: This performs two-step update. First it deletes the target table if it exists, then creates a new table with the new data.
error: (default) If the target table already exists, throws an exception.
ignore: If the target table exists, ignores the save operation.
Writing Data Using Spark SQL
Writing tables inside Spark SQL is supported. This is an example of creating a new table with CREATE TABLE AS (SELECT ...)
:
spark.sql("""
CREATE TABLE my_table
USING com.treasuredata.spark
OPTIONS(table '(database name).(table name)')
AS SELECT ...
""")
Creating New Databases and Tables
The following functions are supported for creating/dropping tables:
td.table(...).exists
td.table(...).dropIfExists
td.table(...).createIfNotExists
td.database(...).exists
td.database(...).dropIfExists
td.database(...).createIfNotExists
Creating New UDP Tables
When you know using an exact match to a column (e.g., id values) in the table is necessary, using UDP (User-Defined Partitioning). You can create UDP tables with these methods:
td.createLongPartitionedTable("table_name", "(long type column name)")
td.createStringPartitionedTable("table_name", "(string type column name)")
Swapping Table Contents
This feature is available since td-spark 20.5.0.
You can replace the contents of two tables. The input tables must be in the same database:
// Swap the contents of two tables
td.swapTables("db1.table1", "db1.table2")
// Another way to swap tables
td.table("db1.table1").swapTableWith("table2")
Advanced Usage
Using multiple TD accounts
To access data in a diffeerent TD account from the one specified in td-spark.conf, use td.withApiKey(apikey)
:
// Returns a new TDSparkContext with the specified key
val td2 = td.withApiKey("key2")
For reading tables with the new key, use td2
:
// Read a table with key2
val df = td2.table("sample_datasets.www_access").df
You can also specify a new key in the data source options:
df.write
.mode("overwrite")
.format("com.treasuredata.spark")
.option("table", "mydb.tbl1")
.option("apikey", "key2") // Specify a new api key
.save