td-pyspark: API Reference
TDSparkContext
- class td_pyspark.td_pyspark.TDSparkContext(spark, td=None)
Treasure Data Spark Context
- __init__(spark, td=None)
- Parameters:
spark (pyspark.sql.SparkSessio) – SparkSession already connected to Spark.
td (TDSparkContext, optional) – Treasure Data Spark Context.
- df(table)
Load Treasure Data table into Spark DataFrame
- Parameters:
table (str) – Table name of Treasure Data.
- Returns:
Loaded table data.
- Return type:
pyspark.sql.DataFrame
- presto(sql, database=None)
Submit Presto Query
- Parameters:
sql (str) – A SQL to be executed.
database (str, optional) – Target database name.
- Returns:
SQL result
- Return type:
pyspark.sql.DataFrame
- Example:
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> spark = SparkSession.builder.master("local").getOrCreate() >>> td = TDSparkContext(spark) >>> sql = "select code, count(*) from sample_datasets.www_access group by 1" >>> q = td.presto(sql) >>> q.show() 2019-06-13 20:09:13.245Z info [TDPrestoJDBCRDD] - (TDPrestoRelation.scala:106) Submit Presto query: select code, count(*) cnt from sample_datasets.www_access group by 1 +----+----+ |code| cnt| +----+----+ | 200|4981| | 500| 2| | 404| 17| +----+----+
- execute_presto(sql, database=None)
Run non query statements (e.g., INSERT INTO, CREATE TABLE)
- Parameters:
sql (str) – A SQL to be executed.
database (str, optional) – Target database name.
- Example:
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.execute_presto("CREATE TABLE IF NOT EXISTS A(time bigint, id varchar)")
- table(table)
Fetch TreasureData table
- Parameters:
table (str) – Table name
- Returns:
TD table data. df() method should be called to treat as spark.sql.DataFrame.
- Return type:
- Example:
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.table("sample_datasets.www_access") <td_pyspark.td_pyspark.TDTable object at 0x10eedf240>
- db(name)
Fetch TreasureData database
- Parameters:
name (str) – Database name
- Returns:
TD database data. df() method should be called to treat as spark.sql.DataFrame.
- Return type:
- Example:
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.db("sample_datasets") <td_pyspark.td_pyspark.TDDatabase object at 0x10eedfa58>
- set_log_level(log_level)
Set log level for Spark
- Parameters:
log_level (str) – Log level for Spark process. {“ALL”, “DEBUG”, “ERROR”, “FATAL”, “INFO”, “OFF”, “TRACE”, “WARN”}
- Example:
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.set_log_level("DEBUG") 2019-09-06T18:19:12.398-0700 info [TDSparkContext] Setting the log level of com.treasuredata.spark to DEBUG - (TDSparkContext.scala:62)
- use(name)
Change current database
- Parameters:
name (str) – Target database name to be changed.
- Example:
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.use("mydb") 2019-09-06T18:19:49.469-0700 info [TDSparkContext] Use mydb - (TDSparkContext.scala:150)
- with_apikey(apikey)
Set an additional apikey
- Parameters:
apikey (str) – apikey for TreasureData
- Example:
>>> from td_pyspark import TDSparkContext >>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder.master("local").getOrCreate() >>> td = TDSparkContext(spark) >>> td2 = td.with_apikey("key2")
- write(df, table_name, mode='error')
Write a DataFrame as a TreasureData table
- Parameters:
df – Target DataFrame to be ingested to TreasureData.
table_name – Target table name to be inserted.
mode –
Save mode same as Spark. {“error”. “overwrite”, “append”, “ignore”}
error: raise an exception.
overwrite: drop the existing table, recreate it, and insert data.
append: insert data. Create if does not exist.
ignore: do nothing.
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> import pandas as pd >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]})) >>> td.write(df, "mydb.table1", "error")
- insert_into(df, table_name)
Insert a DataFrame into existing TreasureData table
- Parameters:
df (pyspark.sql.DataFrame) – Target DataFrame to be ingested to TreasureData.
table_name (str) – Target table name to be inserted.
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> import pandas as pd >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]})) >>> td.insert_into(df, "mydb.table1") 2019-09-09T10:57:37.558-0700 info [TDWriter] Uploading data to mydb.table1 (mode: Append) - (TDWriter.scala:66) 2019-09-09T10:57:38.187-0700 info [TDWriter] [txx:8184891a] Starting a new transaction for updating mydb.table1 - (TDWriter.scala:95) 2019-09-09T10:57:42.897-0700 info [TDWriter] [txx:8184891a] Finished uploading 1 partitions (1 records, size:132B) to mydb.table1 - (TDWriter.scala:132)
- create_or_replace(df, table_name)
Create or replace a TreasureData table wity a DataFrame
- Parameters:
df (pyspark.sql.DataFrame) – Target DataFrame to be ingested to TreasureData.
table_name (str) – Target table name to be ingested.
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]})) >>> td.create_or_replace(df, "mydb.table1") 2019-09-09T10:57:56.381-0700 warn [DefaultSource] Dropping mydb.table1 (Overwrite mode) - (DefaultSource.scala:94) 2019-09-09T10:57:56.923-0700 info [TDWriter] Uploading data to mydb.table1 (mode: Overwrite) - (TDWriter.scala:66) 2019-09-09T10:57:57.106-0700 info [TDWriter] [txx:a69bce97] Starting a new transaction for updating aki.tds_test - (TDWriter.scala:95) 2019-09-09T10:57:59.179-0700 info [TDWriter] [txx:a69bce97] Finished uploading 1 partitions (1 records, size:132B) to aki.tds_test - (TDWriter.scala:132)
- create_table_if_not_exists(table_name)
Create a table if not exists
- Parameters:
table_name (str) – Target table name to be created.
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.create_table_if_not_exists(df, "mydb.table1") 2019-09-09T13:43:41.142-0700 warn [TDTable] Creating table aki.tds_test if not exists - (TDTable.scala:67)
- drop_table_if_exists(table_name)
Drop a table if exists
- Parameters:
table_name (str) – Target table name to be dropped.
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.drop_table_if_exists(df, "mydb.table1")
- create_database_if_not_exists(db_name)
Create a database if not exits
- Parameters:
db_name (str) – Target database name to be created.
- Example:
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.create_database_if_not_exists(df, "mydb")
- drop_database_if_exists(db_name)
Drop a database if exists
- Parameters:
db_name (str) – Target database name to be dropped
- Example:
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.drop_database_if_exists(df, "mydb")
- create_udp_l(table_name, long_column_name)
Create an User-Defined Partition Table partitioned by Long type column
User-defined partitioning (UDP) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values). This method is for creating a UDP table partitioned by Long type column.
- Parameters:
table_name (str) – Target table name to be created as a UDP table.
long_column_name (str) – Partition column with Long (bigint) type column
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.create_udp_l("mydb.departments", "dept_id") 2019-09-09T10:43:20.913-0700 info [UDP] - (UDP.scala:41) Preparing UDP table: -- td-spark: UDP creation create table if not exists "mydb"."departments" ( time bigint, "dept_id" bigint ) with ( bucketed_on = array['dept_id'], bucket_count = 512 )
- create_udp_s(table_name, string_column_name)
Create an User-Defined Partition Table partitioned by string type column
User-defined partitioning (UDP) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values). This method is for creating a UDP table partitioned by string type column.
- Parameters:
table_name – Target table name to be created as a UDP table.
string_column_name – Partition column with string type column
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.create_udp_s("mydb.user_list", "id") 2019-09-09T10:45:27.802-0700 info [UDP] - (UDP.scala:41) Preparing UDP table: -- td-spark: UDP creation create table if not exists "mydb"."user_list" ( time bigint, "id" varchar ) with ( bucketed_on = array['id'], bucket_count = 512 )
- swap_tables(table1, table2)
Swap table contents within the same database.
- Parameters:
table1
table2
- Example:
>>> td.swap_tables("mydb.tbl1", "mydb.tbl2")
TDSparkContextBuilder
- class td_pyspark.td_pyspark.TDSparkContextBuilder(builder)
Util method to set TD-specific configuration and validation
- ENDPOINTS = ['us', 'jp', 'eu01', 'ap02']
- classmethod default_jar_path()
Returns default td-spark jar path in the package
- Returns:
Default td-spark jar path inlcluded in td-pyspark package.
- Return type:
str
- Example:
>>> TDSparkContextBuilder.default_jar_path() '/usr/local/lib/python3.7/site-packages/td_pyspark/jars/td-spark-assembly.jar'
- __init__(builder)
- Parameters:
builder (pyspark.sql.SparkSession.Builder) – A builder for SparkSession.
- apikey(apikey)
Set apikey for td-spark
- Parameters:
apikey (str) – apikey for TreasureData
- Returns:
self
- api_endpoint(endpoint)
Set configuration of API host for TreasureData
- Parameters:
endpoint (str) – API host name for TreasureData
- Returns:
self
- presto_endpoint(endpoint)
Set configuration of Presto API host
- Parameters:
endpoint (str) – Presto API host name for TreasureData
- Returns:
self
- plazma_endpoint(endpoint)
Set configuration for Plazma API host
- Parameters:
endpoint (str) – Plazma API host name for TreasureData
- Returns:
self
- site(siteName)
Set td-spark site to use
- Parameters:
siteName (str) – “us”, “jp”, “eu01”, or “ap02”
- Returns:
self
- jars(jar_path)
Set spark.jars
- Parameters:
jar_path (str) – Comma-separated list of jar file paths. Globs are allowed
- Returns:
self
- build()
Build TDSparkContext
TDDatabase
- class td_pyspark.td_pyspark.TDDatabase(db, sc, sqlContext)
An class represents a database on Treasure Data.
- __init__(db, sc, sqlContext)
- Parameters:
db – A database object of td-spark
sc (pyspark.SparkContext) – PySpark SparkContext
sqlContext (pyspark.sql.SQLContext) – PySpark SQLContext
- exists()
Check database existence
- Returns:
Existence of the database
- Return type:
bool
- Example:
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> db1 = td.db("mydb") >>> db1.exists() True
- create_if_not_exists()
Create a database if not exists
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> db1 = td.db("mydbtest") >>> db1.exists() False >>> db1.create_if_not_exists() 2019-09-09T10:27:10.710-0700 warn [TDDatabase] Creating database mydbtest if not exists - (TDDatabase.scala:38)
- drop_if_exists()
Drop a database if exists
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> db1 = td.db("mydbtest") >>> db1.exists() True >>> db1.drop_if_exists() 2019-09-09T10:27:10.710-0700 warn [TDDatabase] Dropping database mydbtest if not exists - (TDDatabase.scala:38)
- table(table)
Fetch the target TD table
- Parameters:
table (str) – Target table name.
- Returns:
Target TDTable
- Return type:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> db1 = td.db("sample_datasets") >>> db1.table("www_access") <td_pyspark.td_pyspark.TDTable at 0x10a10c400>
TDTable
- class td_pyspark.td_pyspark.TDTable(table, sc, sqlContext)
A class represents a table of Treasure Data
- __init__(table, sc, sqlContext)
- Parameters:
table – A table object of td-spark
sc (pyspark.SparkContext) – PySpark SparkContext
sqlContext (pyspark.sql.SQLContext) – PySpark SQLContext
- within(duration, timezone=None)
Filter a table with time range like TD_INTERVAL
- Parameters:
duration (str) – A string to specify a target time range (e.g., “-1h”). For detailed syntax, see also TD_INTERVAL function in Presto.
- Returns:
A TD table filtered by duration
- Return type:
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() # to read the last 1 hour range of data >>> td.table("tbl").within("-1h").df() # to read the last 1 day range of data >>> td.table("tbl").within("-1d").df() # to read the last 1 day range of data in JST timezone >>> td.table("tbl").within("-1d", "Asia/Tokyo") # to read the last day's data beginning from 7 days ago >>> td.table("tbl").within("-1d/-7d").df() # to set specific time range >>> td.table("sample_datasets.www_access").within("2014-10-04/2014-10-05").df()
- drop_if_exists()
Drop a table if exists
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.table("tbl").drop_if_exists()
- create_if_not_exists()
Create a table if not exists
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.table("tbl").create_if_not_exists()
- swap_table_with(target_table)
Swap the contents of tables
- Parameters:
target_table – A target table name. This target table must be in the same database.
- Example:
>>> td.table("mydb.tbl1").swap_table_with("tbl2")
- exists()
Check the existence of the table
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.table("tbl").exists() True
- within_unixtime_range(from_unixtime, to_unixtime)
Filter a table with unix time range
This method filter the table [from_unixtime, to_unixtime).
- Parameters:
from_unixtime (int) – Beginning unix time of the range, which is included.
to_unixtime (int) – End unix time of the range, which isn’t included.
- Returns:
A filtered table by Unix time range
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> t1 = td.table("sample_datasets.www_access") >>> t1.within_unixtime_range(1412320845, 1412321000)
- within_utc_time_range(from_string, to_string)
Filter a table with time range of UTC Time Zone
This method filter the table [from_string, to_string).
- Parameters:
from_string (str) – Beginning of the range with yyyy-MM-dd HH:mm:ss format e.g. “2014-10-03 09:12:00”
to_string (str) – End of the range with yyyy-MM-dd HH:mm:ss format.
- Returns:
A filtered table by UTC time range
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> t1 = td.table("sample_datasets.www_access") >>> t1.within_utc_time_range("2014-10-03 09:12:00", "2014-10-03 09:13:00")
- within_time_range(from_string, to_string, timezone=None)
Filter a table with time range with specified Time Zone
This method filter the table [from_string, to_string).
- Parameters:
from_string (str) – Beginning of the range with yyyy-MM-dd HH:mm:ss format e.g. “2014-10-03 09:12:00”
to_string (str) – End of the range with yyyy-MM-dd HH:mm:ss format.
timezone (str) – Time Zone ID string which is passed to
java.time.ZoneId.of()
e.g. “Asia/Tokyo”
- Returns:
A filtered table by UTC time range
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> t1 = td.table("sample_datasets.www_access") >>> t1.within_time_range("2014-10-03 18:12:00", "2014-10-03 18:13:00", "Asia/Tokyo")
- df()
Convert table into a Spark DataFrame
- Returns:
A Spark DataFrame represents a TDTable
- Return type:
pyspark.sql.DataFrame
- Example:
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.table("sample_datasets.www_access").df()