# Getting Started: td-pyspark [Treasure Data](https://treasuredata.com) extension for using [pyspark](https://spark.apache.org/docs/latest/api/python/index.html). ## Installation You can install td-pyspark from PyPI by using `pip` as follows: ```sh $ pip install td-pyspark ``` If you want to install PySpark via PyPI as well, you can install as: ```sh $ pip install td-pyspark[spark] ``` ## Introduction First contact [support@treasure-data.com](mailto:support@treasure-data.com) to enable td-spark feature. This feature is disabled by default. td-pyspark is a library to enable Python to access tables in Treasure Data. The features of td_pyspark include: - Reading tables in Treasure Data as DataFrame - Writing DataFrames to Treasure Data - Submitting Presto queries and read the query results as DataFrames For more details, see also [td-spark FAQs](https://docs.treasuredata.com/display/public/PD/Apache+Spark+Driver+%28td-spark%29+FAQs). ### Quick Start with Docker You can try td_pyspark using Docker without installing Spark nor Python. First create __td-spark.conf__ file and set your TD API KEY and site (us, jp, eu01, ap02) configurations: __td-spark.conf__ ``` spark.td.apikey (Your TD API KEY) spark.td.site (Your site: us, jp, eu01, ap02) spark.serializer org.apache.spark.serializer.KryoSerializer spark.sql.execution.arrow.pyspark.enabled true ``` Launch pyspark Docker image. This image already has a pre-installed td_pyspark library: ```shell $ docker run -it -e TD_SPARK_CONF=td-spark.conf -v $(pwd):/opt/spark/work devtd/td-spark-pyspark:latest_spark3.1.1 Python 3.9.2 (default, Feb 19 2021, 17:33:48) [GCC 10.2.1 20201203] on linux Type "help", "copyright", "credits" or "license" for more information. 21/05/10 09:04:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.1.1 /_/ Using Python version 3.9.2 (default, Feb 19 2021 17:33:48) SparkSession available as 'spark'. 2021-05-10 09:04:53.268Z debug [spark] Loading com.treasuredata.spark package - (package.scala:23) ... >>> ``` Try read a sample table by specifying a time range: ```python >>> df = td.table("sample_datasets.www_access").within("+2d/2014-10-04").df() >>> df.show() 2021-05-10 09:07:40.233Z info [PartitionScanner] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-06 00:00:00Z) - (PartitionScanner.scala:29) 2021-05-10 09:07:42.262Z info [PartitionScanner] Retrieved 2 partition entries - (PartitionScanner.scala:36) +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ |user| host| path| referer|code| agent|size|method| time| +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ |null|192.225.229.196| /category/software| -| 200|Mozilla/5.0 (Maci...| 117| GET|1412382292| |null|120.168.215.131| /category/software| -| 200|Mozilla/5.0 (comp...| 53| GET|1412382284| |null|180.198.173.136|/category/electro...| /category/computers| 200|Mozilla/5.0 (Wind...| 106| GET|1412382275| |null| 140.168.145.49| /item/garden/2832| /item/toys/230| 200|Mozilla/5.0 (Maci...| 122| GET|1412382267| |null| 52.168.78.222|/category/electro...| /item/games/2532| 200|Mozilla/5.0 (comp...| 73| GET|1412382259| |null| 32.42.160.165| /category/cameras|/category/cameras...| 200|Mozilla/5.0 (Wind...| 117| GET|1412382251| |null| 48.204.59.23| /category/software|/search/?c=Electr...| 200|Mozilla/5.0 (Maci...| 52| GET|1412382243| |null|136.207.150.227|/category/electro...| -| 200|Mozilla/5.0 (iPad...| 120| GET|1412382234| |null| 204.21.174.187| /category/jewelry| /item/office/3462| 200|Mozilla/5.0 (Wind...| 59| GET|1412382226| |null| 224.198.88.93| /category/office| /category/music| 200|Mozilla/4.0 (comp...| 46| GET|1412382218| |null| 96.54.24.116| /category/games| -| 200|Mozilla/5.0 (Wind...| 40| GET|1412382210| |null| 184.42.224.210| /category/computers| -| 200|Mozilla/5.0 (Wind...| 95| GET|1412382201| |null| 144.72.47.212|/item/giftcards/4684| /item/books/1031| 200|Mozilla/5.0 (Wind...| 65| GET|1412382193| |null| 40.213.111.170| /item/toys/1085| /category/cameras| 200|Mozilla/5.0 (Wind...| 65| GET|1412382185| |null| 132.54.226.209|/item/electronics...| /category/software| 200|Mozilla/5.0 (comp...| 121| GET|1412382177| |null| 108.219.68.64|/category/cameras...| -| 200|Mozilla/5.0 (Maci...| 54| GET|1412382168| |null| 168.66.149.218| /item/software/4343| /category/software| 200|Mozilla/4.0 (comp...| 139| GET|1412382160| |null| 80.66.118.103| /category/software| -| 200|Mozilla/4.0 (comp...| 92| GET|1412382152| |null|140.171.147.207| /category/music| /category/jewelry| 200|Mozilla/5.0 (Wind...| 119| GET|1412382144| |null| 84.132.164.204| /item/software/4783|/category/electro...| 200|Mozilla/5.0 (Wind...| 137| GET|1412382135| +----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+ only showing top 20 rows >>> ``` ## Usage _TDSparkContext_ is an entry point to access td_pyspark's functionalities. To create TDSparkContext, pass your SparkSession (spark) to TDSparkContext: ```python td = TDSparkContext(spark) ``` ### Reading Tables as DataFrames To read a table, use `td.table(table name)`: ```python df = td.table("sample_datasets.www_access").df() df.show() ``` To change the context database, use `td.use(database_name)`: ```python td.use("sample_datasets") # Accesses sample_datasets.www_access df = td.table("www_access").df() ``` By calling `.df()` your table data will be read as Spark's DataFrame. The usage of the DataFrame is the same with PySpark. See also [PySpark DataFrame documentation](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame). #### Specifying Time Ranges Treasure Data is a time series database, so reading recent data by specifying a time range is important to reduce the amount of data to be processed. `.within(...)` function can be used to specify a target time range in a concise syntax. `within` function accepts the same syntax used in [TD_INTERVAL function in Presto](https://docs.treasuredata.com/display/public/PD/Supported+Presto+and+TD+Functions#SupportedPrestoandTDFunctions-TD_INTERVAL). For example, to read the last 1 hour range of data, use `within("-1h")`: ```python td.table("tbl").within("-1h").df() ``` You can also read the last day's data: ```python td.table("tbl").within("-1d").df() ``` You can also specify an _offset_ of the relative time range. This example reads the last days's data beginning from 7 days ago: ```python td.table("tbl").within("-1d/-7d").df() ``` If you know an exact time range, `within("(start time)/(end time)")` is useful: ```python >>> df = td.table("sample_datasets.www_access").within("2014-10-04/2014-10-05").df() >>> df.show() 2021-05-10 09:10:02.366Z info [PartitionScanner] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-05 00:00:00Z) - (PartitionScanner.scala:29) ... ``` See [this doc](https://docs.treasuredata.com/display/public/PD/Supported+Presto+and+TD+Functions#SupportedPrestoandTDFunctions-TD_INTERVAL) for more examples of interval strings. #### Submitting Presto Queries If your Spark cluster is small, reading all of the data as in-memory DataFrame might be difficult. In this case, you can utilize Presto, a distributed SQL query engine, to reduce the amount of data processing with PySpark: ```python >>> q = td.presto("select code, * from sample_datasets.www_access") >>> q.show() 2019-06-13 20:09:13.245Z info [TDPrestoJDBCRDD] - (TDPrestoRelation.scala:106) Submit Presto query: select code, count(*) cnt from sample_datasets.www_access group by 1 +----+----+ |code| cnt| +----+----+ | 200|4981| | 500| 2| | 404| 17| +----+----+ ``` The query result is represented as a DataFrame. To run non query statements (e.g., INSERT INTO, CREATE TABLE, etc.) use `execute_presto(sql)`: ```python td.execute_presto("CREATE TABLE IF NOT EXISTS A(time bigint, id varchar)") ``` #### Using SparkSQL To use tables in Treaure Data inside Spark SQL, create a view with `df.createOrReplaceTempView(...)`: ```python # Read TD table as a DataFrame df = td.table("mydb.test1").df() # Register the DataFrame as a view df.createOrReplaceTempView("test1") spark.sql("SELECT * FROM test1").show() ``` ### Create or Drop Databases and Tables Create a new table or database: ```python td.create_database_if_not_exists("mydb") td.create_table_if_not_exists("mydb.test1") ``` Delete unnecessary tables: ```python td.drop_table_if_exists("mydb.test1") td.drop_database_if_exists("mydb") ``` You can also check the presence of a table: ```python td.table("mydb.test1").exists() # True if the table exists ``` ### Create User-Defined Partition Tables User-defined partitioning ([UDP](https://docs.treasuredata.com/display/public/PD/Defining+Partitioning+for+Presto)) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values). You can create a UDP table partitioned by id (string type column) as follows: ```python td.create_udp_s("mydb.user_list", "id") ``` To create a UDP table, partitioned by Long (bigint) type column, use `td.create_udp_l`: ```python td.create_udp_l("mydb.departments", "dept_id") ``` ### Swapping Table Contents You can replace the contents of two tables. The input tables must be in the same database: ```python # Swap the contents of two tables td.swap_tables("mydb.tbl1", "mydb.tbl2") # Another way to swap tables td.table("mydb.tbl1").swap_table_with("tbl2") ``` ### Uploading DataFrames to Treasure Data To save your local DataFrames as a table, `td.insert_into(df, table)` and `td.create_or_replace(df, table)` can be used: ```python # Insert the records in the input DataFrame to the target table: td.insert_into(df, "mydb.tbl1") # Create or replace the target table with the content of the input DataFrame: td.create_or_replace(df, "mydb.tbl2") ``` ## Using multiple TD accounts To specify a new api key aside from the key that is configured in td-spark.conf, just use `td.with_apikey(apikey)`: ```python # Returns a new TDSparkContext with the specified key td2 = td.with_apikey("key2") ``` For reading tables or uploading DataFrames with the new key, use `td2`: ```python # Read a table with key2 df = td2.table("sample_datasets.www_access").df() ... # Insert the records with key2 td2.insert_into(df, "mydb.tbl1") ``` ### Running PySpark jobs with spark-submit To submit your PySpark script to a Spark cluster, you will need the following files: - __td-spark.conf__ file that describes your TD API key and `spark.td.site` (See above). - __td_pyspark.py__ - Check the file location using `pip show -f td-pyspark`, and copy td_pyspark.py to your favorite location - __td-spark-assembly-latest_xxxx.jar__ - Get the latest version from [Download](https://treasure-data.github.io/td-spark/release_notes.html#download) page. - Pre-build Spark - [Download Spark 3.4.2](https://spark.apache.org/downloads.html) with Hadoop 3.3 (built for Scala 2.12) - Extract the downloaded archive. This folder location will be your `$SPARK_HOME`. Here is an example PySpark application code: __my_app.py__ ```python import td_pyspark from pyspark.sql import SparkSession # Create a new SparkSession spark = SparkSession\ .builder\ .appName("myapp")\ .getOrCreate() # Create TDSparkContext td = td_pyspark.TDSparkContext(spark) # Read the table data within -1d (yesterday) range as DataFrame df = td.table("sample_datasets.www_access").within("-1d").df() df.show() ``` To run `my_app.py` use spark-submit by specifying the necessary files mentioned above: ```bash # Launching PySpark with the local mode $ ${SPARK_HOME}/bin/spark-submit --master "local[4]"\ --driver-class-path td-spark-assembly.jar\ --properties-file=td-spark.conf\ --py-files td_pyspark.py\ my_app.py ``` `local[4]` means running a Spark cluster locally using 4 threads. To use a remote Spark cluster, specify `master` address, e.g., `--master=spark://(master node IP address):7077`. ### Using td-spark assembly included in the PyPI package. The package contains pre-built binary of td-spark so that you can add it into the classpath as default. `TDSparkContextBuilder.default_jar_path()` returns the path to the default td-spark-assembly.jar file. Passing the path to `jars` method of TDSparkContextBuilder will automatically build the SparkSession including the default jar. ```python import td_pyspark from pyspark.sql import SparkSession builder = SparkSession\ .builder\ .appName("td-pyspark-app") td = td_pyspark.TDSparkContextBuilder(builder)\ .apikey("XXXXXXXXXXXXXX")\ .jars(TDSparkContextBuilder.default_jar_path())\ .build() ```