# Configurations ## td-spark.conf To use td-spark library in your Spark applications, create `td-spark.conf` file and pass it to `--properties-file` option of `spark-submit` (or `spark-shell`) command. __td-spark.conf__ ``` spark.td.site us spark.td.apikey (your TD API key) # Using KryoSerializer is recommended for faster performance spark.serializer org.apache.spark.serializer.KryoSerializer ``` - `spark.td.site` can be one of `us`, `jp`, `eu01`, `ap02` based on your TD account region. - `spark.td.apikey` is your TD API key. To learn how to get TD API keys, see also: [Getting Your API Keys](https://docs.treasuredata.com/display/public/PD/Getting+Your+API+Keys). To configure JVM memory size for Spark, etc., add additional configurations to `td-spark.conf` file: ``` # default 1g spark.driver.memory 2g # default 1g spark.executor.memory 2g # (optional) If you see an error around Kryo serializer spark.kryoserializer.buffer.max 1g ``` See also the list of the [available properties](https://spark.apache.org/docs/latest/configuration.html#available-properties) in Spark. ## Using Spark Shell To start using Spark Shell, a REPL interface for Scala and Spark, two options are available: downloading Spark or using Docker: ### Downloading Spark - Spark itself - Download a pre-compiled package of Spark from the [download page](https://spark.apache.org/downloads.html) - td-spark-assembly.jar file - Download it from [Download](./release_notes.html#download) To launch Spark shell, specify the paths to your _td-spark-assembly.jar_ and _td-spark.conf_ file: ```shell $ ./bin/spark-shell --jars (path to td-spark-assembly.jar) --properties-file (path to td-spark.conf) ``` ### Docker If you have a docker installed in your machine, you can quickly start using td-spark without manually downloading Spark nor td-spark library. You can launch the td-spark shell by setting your TD API key as `TD_API_KEY` environment variable: ```sh $ export TD_API_KEY=(your TD API key) $ docker run -it -e TD_API_KEY=$TD_API_KEY devtd/td-spark-shell:latest_spark3.1.1 ``` This image will automatically import the package for td-spark and set `td` variable as described below. ## td: TDSparkContext To use the functionality of td-spark, import `com.treasuredata.spark._` and create a `TDSparkContext` (td) from the current Spark session (spark variable): ```scala scala> import com.treasuredata.spark._; val td = spark.td ``` This _td_ variable (TDSparkContext) will be an entry point to access Treasure Data. Now your are ready to read tables in TD with `td.table("(db name).(table name)")`: ```scala // Read a table in TD as a Spark DataFrame scala> val df = td.table("sample_datasets.www_access").df // Show the content of the DataFrame scala> df.show ``` # Reading Tables ## within: Specifying Time Ranges of Tables Treasure Data is a time-series database and your data are growing every day. To effectively find a target portion of your data, __specifying a time range of the table__ is important. To do so, you can use `td.table("table name")` and `within` filtering method. After applying some filtering, you can create a DataFrame with `.df` method. For example, if you want read the data collected last day, use `.within("-1d")` to narrow down the time range to scan: ```scala val df = td.table("(database).(table)").within("-1d").df ``` This `within(...)` syntax supports the same interval string representations with [TD_INTERVAL](https://docs.treasuredata.com/display/public/PD/Supported+Presto+and+TD+Functions#SupportedPrestoandTDFunctions-TD_INTERVAL). Here are several examples to specify time ranges of a table: ```scala val t = td.table("sample_datasets.www_access") // Using duration strings as in TD_INTERVAL t.within("-1d") // last day t.within("-1m/2014-10-03 09:13:00") // last 1 minute from a given offset t.within("-1d", java.time.ZoneId.of("Asia/Tokyo")) // last day in JST tiemzone // Specifying unix time ranges // [from, unti) unix time range t.withinUnixTimeRange(from = 1412320845, until = 1412321000) t.fromUnixTime(1412320845) // [from, ...) t.untilUnixTime(1412321000) // [... , until) // Using time strings yyyy-MM-dd HH:mm:ss (timezone. default = UTC)? t.fromTime("2014-10-03 09:12:00") // [from, ...) t.untilTime("2014-10-03 09:13:00") // [..., until) // [from, until) t.withinTimeRange(from = "2014-10-03 09:12:00", until = "2014-10-03 09:13:00") // [from, until) in Asia/Tokyo timezone t.withinTimeRange(from = "2014-10-03 09:12:00", until = "2014-10-03 09:13:00", ZoneId.of("Asia/Tokyo")) // Specifying timezone // [from, ...) in Asia/Tokyo timezone t.fromTime("2014-10-03 09:12:00", ZoneId.of("Asia/Tokyo")) // [..., until) in Asia/Tokyo timezone t.untilTime("2014-10-03 09:13:00", ZoneId.of("Asia/Tokyo")) ``` The following date time formats are supported in from and to arguments of withinTimeRange: - yyyy-MM-dd HH:mm:ss zzz - e.g., 2017-01-01 00:00:00 UTC, 2017-01-01 00:00:00 America/Los_Angeles, 2017-01-01 00:00:00Z, etc. - yyyy-MM-dd - e.g., 2016-12-01. UTC timezone will be used - unix time - e.g., 1412327520. Unix time is the number of seconds since 1970-01-01 00:00:00Z Note: An access token for reading partition data expires in an hour by default, so if you have a long-running Spark job, you may receive the following error message: ``` The access token is expired: ... You may need to set a longer expiration time with TDTable.withExpirationSec(seconds) for td-spark. ``` To extend the expiration period longer than 3600 seconds (= 1 hour), use `withExpirationSec`: ```scala val t = td.table("sample_datasets.www_access").withExpirationSec(7200) ``` ## DataFrames After reading table as DataFrame, you can use Spark's functionality for manipulating table data. See also [Spark SQL: Getting Started](https://spark.apache.org/docs/latest/sql-getting-started.html) for more detailed usages of DataFrame. ```scala scala> val df = td.table("sample_datasets.www_access").df df: org.apache.spark.sql.DataFrame = [user: bigint, host: string, path: string, referer: string, code: bigint, agent: string, size: bigint, method: string, time: bigint] scala> df.show +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ |user| host| path| referer|code| agent|size|method| time| +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ |null| 128.201.191.32| /category/office| /item/finance/4164| 200|Mozilla/5.0 (Wind...| 42| GET|1412351990| |null| 40.129.109.180| /item/software/2265| -| 200|Mozilla/5.0 (Maci...| 48| GET|1412351976| |null| 52.120.217.35| /category/toys| -| 200|Mozilla/5.0 (Wind...| 127| GET|1412351961| |null| 52.36.173.175|/item/electronics...|http://www.google...| 200|Mozilla/5.0 (comp...| 135| GET|1412351947| |null| 64.51.37.197| /category/software|/category/electro...| 200|Mozilla/5.0 (comp...| 62| GET|1412351932| |null| 184.90.157.127|/item/electronics...|/search/?c=Electr...| 200|Mozilla/4.0 (comp...| 114| GET|1412351918| |null| 172.207.112.61| /category/jewelry|/category/electro...| 200|Mozilla/5.0 (Wind...| 105| GET|1412351904| |null| 168.186.74.157|/category/books?f...| /item/software/917| 200|Mozilla/5.0 (comp...| 110| GET|1412351889| |null| 132.111.61.48|/category/office?...| -| 200|Mozilla/5.0 (Wind...| 135| GET|1412351875| |null| 32.60.103.31| /category/books|/category/electro...| 200|Mozilla/5.0 (Wind...| 113| GET|1412351861| |null| 192.78.111.80| /item/office/4848| -| 200|Mozilla/4.0 (comp...| 70| GET|1412351846| |null| 40.195.100.76|/category/electro...| -| 200|Mozilla/5.0 (comp...| 41| GET|1412351832| |null| 64.30.110.187| /category/jewelry| -| 200|Mozilla/5.0 (Wind...| 85| GET|1412351817| |null|128.147.132.146| /category/jewelry|/category/electro...| 200|Mozilla/5.0 (comp...| 97| GET|1412351803| |null| 40.90.71.68| /item/software/4079| /category/software| 200|Mozilla/5.0 (Wind...| 114| GET|1412351789| |null| 72.81.86.154|/category/electro...|http://www.google...| 200|Mozilla/5.0 (Wind...| 129| GET|1412351774| |null| 156.42.197.129| /category/books| -| 200|Mozilla/5.0 (Wind...| 133| GET|1412351760| |null| 176.36.215.78|/category/electro...| -| 200|Mozilla/5.0 (Wind...| 84| GET|1412351746| |null|204.117.145.144| /item/cameras/940|/category/electro...| 200|Mozilla/5.0 (Wind...| 126| GET|1412351731| |null| 180.96.64.110|/item/electronics...| /category/games| 200|Mozilla/4.0 (comp...| 112| GET|1412351717| +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ only showing top 20 rows scala> df.groupBy("method").count().show +------+-----+ |method|count| +------+-----+ | GET| 4624| | POST| 376| +------+-----+ scala> df.filter(df("method") === "POST").show +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ |user| host| path| referer|code| agent|size|method| time| +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ |null| 156.93.73.140|/search/?c=Electr...| -| 200|Mozilla/4.0 (comp...| 79| POST|1412351010| |null| 204.180.29.118|/search/?c=Electr...| -| 200|Mozilla/4.0 (comp...| 69| POST|1412350836| |null|180.159.172.114| /search/?c=Books| -| 200|Mozilla/5.0 (comp...| 50| POST|1412350517| |null| 96.24.207.218| /search/?c=Toys| -| 200|Mozilla/5.0 (comp...| 103| POST|1412350285| |null| 172.159.143.86| /search/?c=Health| -| 200|Mozilla/5.0 (comp...| 86| POST|1412350212| |null|124.192.113.194|/search/?c=Books+...| /search/?c=Software| 200|Mozilla/5.0 (Maci...| 72| POST|1412350140| |null|164.147.150.199|/search/?c=Electr...|/item/electronics...| 200|Mozilla/5.0 (Maci...| 75| POST|1412350052| |null| 68.108.188.24|/search/?c=Networ...| -| 200|Mozilla/5.0 (Maci...| 77| POST|1412350023| |null| 204.210.70.32|/search/?c=Books+...| -| 200|Mozilla/5.0 (Wind...| 136| POST|1412350009| |null| 80.30.170.153| /search/?c=Software| -| 200|Mozilla/5.0 (Wind...| 139| POST|1412349980| |null| 212.33.136.97|/search/?c=Jewelr...| /category/software| 200|Mozilla/4.0 (comp...| 49| POST|1412349746| |null| 224.165.78.90|/search/?c=Garden...|/search/?c=Books+...| 200|Mozilla/5.0 (Wind...| 100| POST|1412349294| |null| 136.84.211.209|/search/?c=Electr...| /item/finance/3680| 200|Mozilla/5.0 (Wind...| 137| POST|1412349162| |null| 100.93.27.59|/search/?c=Electr...| /category/cameras| 200|Mozilla/5.0 (comp...| 133| POST|1412348898| |null| 220.216.212.78|/search/?c=Computers| -| 200|Mozilla/5.0 (Wind...| 42| POST|1412348796| |null|100.177.172.122|/search/?c=Games+...| -| 200|Mozilla/5.0 (Wind...| 41| POST|1412348766| |null| 20.102.185.162|/search/?c=Electr...| -| 200|Mozilla/5.0 (Wind...| 138| POST|1412348737| |null| 132.54.226.209|/search/?c=Electr...| /category/software| 200|Mozilla/4.0 (comp...| 81| POST|1412348590| |null| 156.228.103.39| /search/?c=Music| /item/books/2102| 200|Mozilla/5.0 (Wind...| 46| POST|1412348561| |null| 24.84.40.51| /search/?c=Toys| -| 200|Mozilla/5.0 (Wind...| 90| POST|1412348531| +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ only showing top 20 rows ``` # Querying Tables ## Presto You can also read Presto query results as DataFrames. - `td.presto(sql)` for streaming Presto query results to Spark - `td.prestoJob(sql)` For running queries with large results. This will support internal query retry (e.g., network connection failure) - `td.executePresto(sql)` For running non-query Presto statements, e.g., CREATE TABLE, DROP TABLE, etc. ```scala scala> val df = td.presto("select * from www_access limit 10") df: org.apache.spark.sql.DataFrame = [user: string, host: string, path: string, referer: string, code: bigint, agent: string, size: bigint, method: string, time: bigint] scala> df.show 2016-07-11 16:47:20-0700 info [TDJobRelation] Submitted presto query: select * from www_access limit 10 +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ |user| host| path| referer|code| agent|size|method| time| +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ |null| 76.45.175.151| /item/sports/4642|/search/?c=Sports...| 200|Mozilla/5.0 (Maci...| 137| GET|1412380793| |null|184.162.105.153| /category/finance| -| 200|Mozilla/4.0 (comp...| 68| GET|1412380784| |null| 144.30.45.112|/item/electronics...| /item/software/4777| 200|Mozilla/5.0 (Maci...| 136| GET|1412380775| |null| 68.42.225.106|/category/networking|/category/electro...| 200|Mozilla/4.0 (comp...| 98| GET|1412380766| |null| 104.66.194.210| /category/books| -| 200|Mozilla/4.0 (comp...| 43| GET|1412380757| |null| 64.99.74.69| /item/finance/3775|/category/electro...| 200|Mozilla/5.0 (Wind...| 86| GET|1412380748| |null| 136.135.51.168|/item/networking/540| -| 200|Mozilla/5.0 (Wind...| 89| GET|1412380739| |null| 52.99.134.55| /item/health/1326|/category/electro...| 200|Mozilla/5.0 (Maci...| 51| GET|1412380730| |null| 136.51.116.68|/category/finance...| -| 200|Mozilla/5.0 (comp...| 99| GET|1412380721| |null|136.141.218.177| /item/computers/959| -| 200|Mozilla/5.0 (Wind...| 124| GET|1412380712| +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ ``` ## Hive For running queries with Hive, use `td.hiveJob(sql)`: ```scala scala> val j = td.hiveJob("select 1") 2019-11-22 19:53:19.296Z info [TDSparkContext] - (TDSparkContext.scala:248) Submitted job 622075532: select 1 j: com.treasuredata.spark.TDJobRelation = TDJobRelation(TDSparkContext(local-1574451364721,spark.td.applicationId -> local-1574451364721, spark.td.local.dir -> /tmp),622075532,hive) scala> j.jobId res7: String = 622075532 scala> j.df.show +---+ |_c0| +---+ | 1| +---+ ``` ## Two-Step Job Execution If you want to submit a long-running query and read the result later, use `td.prestoJob(sql)` (for Presto) or `td.hiveJob(sql)` (for Hive), then read the job result by specifying the job_id with `td.jobResult(jobId)`: ```scala # Submit a query scala> val job = td.prestoJob("select 10") 2017-09-05 13:02:06-0700 info [TDSparkContext] - (TDSparkContext.scala:78) Submitted job 171612415: select 10 job: com.treasuredata.spark.TDJobRelation = TDJobRelation(TDSparkContext(local-1504641337677,spark.td.plazma_api.host -> api-plazma.treasuredata.com, spark.td.api.host -> api.treasuredata.com),171612415) ## This method will block until the job completes scala> val result = td.jobResult(job.jobId) result: org.apache.spark.sql.DataFrame = [_col0: bigint] scala> result.show 2017-09-05 13:02:58-0700 info [TDJobRelation] Reading job 171612415 result - (TDJobRelation.scala:132) +-----+ |_col0| +-----+ | 10| +-----+ ``` ## Spark SQL It is also possible to process SQL query totally on your Spark cluster side by mapping tables in TD as TempViews in Spark. Note that SparkSQL is generally slower than using Presto as it will download the source table data to your Spark cluster before processing the query in SparkSQL. ```scala # Register DataFrame as a temporary table scala> td.table("hb_tiny.rankings").df.createOrReplaceTempView("rankings") scala> val q1 = spark.sql("select page_url, page_rank from rankings where page_rank > 100") q1: org.apache.spark.sql.DataFrame = [page_url: string, page_rank: bigint] scala> q1.show 2021-05-10 14:15:58.130Z info [PartitionScanner] Fetching the partition list of hb_tiny.rankings (Full Scan) - (PartitionScanner.scala:29) 2021-05-10 14:15:59.457Z info [PartitionScanner] Retrieved 2 partition entries - (PartitionScanner.scala:36) +--------------------+---------+ | page_url|page_rank| +--------------------+---------+ |xjhmjsuqolfklbvxn...| 251| |seozvzwkcfgnfuzfd...| 165| |fdgvmwbrjlmvuoquy...| 132| |gqghyyardomubrfsv...| 108| |qtqntqkvqioouwfuj...| 278| |wrwgqnhxviqnaacnc...| 135| | cxdmunpixtrqnvglnt| 146| | ixgiosdefdnhrzqomnf| 126| |xybwfjcuhauxiopfi...| 112| |ecfuzdmqkvqktydvi...| 237| |dagtwwybivyiuxmkh...| 177| |emucailxlqlqazqru...| 134| |nzaxnvjaqxapdjnzb...| 119| | ffygkvsklpmup| 332| |hnapejzsgqrzxdswz...| 171| |rvbyrwhzgfqvzqkus...| 148| |knwlhzmcyolhaccqr...| 104| |nbizrgdziebsaecse...| 665| |jakofwkgdcxmaaqph...| 187| |kvhuvcjzcudugtidf...| 120| +--------------------+---------+ only showing top 20 rows ``` # Writing Tables ## Uploading DataFrames to TD To upload a DataFrame to TD, use `df.createOrReplaceTD`, `df.insertIntoTD`: ```scala // This import statment is necessary to support createOrReplaceTD and insertIntoTD for your own DataFrames import com.treasuredata.spark._ val df: DataFrame = ... // Prepare some DataFrame // Writes DataFrame to a new TD table. If the table already exists, this will fail. df.write.td("(database name).(table name)") // Create or replace the target TD table with the contents of DataFrame df.createOrReplaceTD("(database name).(table name)") // Append the contents of DataFrame to the target TD table. // If the table doesn't exists, it will create a new table df.insertIntoTD("(database name).(table name)") ``` ## Using Save Mode of DataFrames You can also use the full `DataFrame.write` and `save` syntax. Specify the format as `com.treasuredata.spark`, then select the target table with `.option("table", "(target database).(table name)")`: ```scala df.write // df is an arbitrary DataFrame .mode("overwrite") // append, overwrite, error, ignore .format("com.treasuredata.spark") .option("table", "(database name).(table name)") // Specify an upload taget table .save ``` ### Available write modes (SaveMode) - append: Append to the target table. Throws an error if the table doesn't exist. - overwrite: This performs two-step update. First it deletes the target table if it exists, then creates a new table with the new data. - error: (default) If the target table already exists, throws an exception. - ignore: If the target table exists, ignores the save operation. ## Writing Data Using Spark SQL Writing tables inside Spark SQL is supported. This is an example of creating a new table with `CREATE TABLE AS (SELECT ...)`: ```scala spark.sql(""" CREATE TABLE my_table USING com.treasuredata.spark OPTIONS(table '(database name).(table name)') AS SELECT ... """) ``` ## Creating New Databases and Tables The following functions are supported for creating/dropping tables: ``` td.table(...).exists td.table(...).dropIfExists td.table(...).createIfNotExists td.database(...).exists td.database(...).dropIfExists td.database(...).createIfNotExists ``` ## Creating New UDP Tables When you know using an exact match to a column (e.g., id values) in the table is necessary, using [UDP (User-Defined Partitioning)](https://docs.treasuredata.com/display/public/PD/Defining+Partitioning+for+Presto). You can create UDP tables with these methods: ``` td.createLongPartitionedTable("table_name", "(long type column name)") td.createStringPartitionedTable("table_name", "(string type column name)") ``` ## Swapping Table Contents This feature is available since td-spark 20.5.0. You can replace the contents of two tables. The input tables must be in the same database: ``` // Swap the contents of two tables td.swapTables("db1.table1", "db1.table2") // Another way to swap tables td.table("db1.table1").swapTableWith("table2") ``` # Advanced Usage ## Using multiple TD accounts To access data in a diffeerent TD account from the one specified in td-spark.conf, use `td.withApiKey(apikey)`: ```scala // Returns a new TDSparkContext with the specified key val td2 = td.withApiKey("key2") ``` For reading tables with the new key, use `td2`: ```scala // Read a table with key2 val df = td2.table("sample_datasets.www_access").df ``` You can also specify a new key in the data source options: ```scala df.write .mode("overwrite") .format("com.treasuredata.spark") .option("table", "mydb.tbl1") .option("apikey", "key2") // Specify a new api key .save ```