td-pyspark: API Reference

TDSparkContext

class td_pyspark.td_pyspark.TDSparkContext(spark, td=None)

Treasure Data Spark Context

__init__(spark, td=None)
Parameters:
  • spark (pyspark.sql.SparkSessio) – SparkSession already connected to Spark.

  • td (TDSparkContext, optional) – Treasure Data Spark Context.

df(table)

Load Treasure Data table into Spark DataFrame

Parameters:

table (str) – Table name of Treasure Data.

Returns:

Loaded table data.

Return type:

pyspark.sql.DataFrame

presto(sql, database=None)

Submit Presto Query

Parameters:
  • sql (str) – A SQL to be executed.

  • database (str, optional) – Target database name.

Returns:

SQL result

Return type:

pyspark.sql.DataFrame

Example:

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> spark = SparkSession.builder.master("local").getOrCreate()
>>> td = TDSparkContext(spark)
>>> sql = "select code, count(*) from sample_datasets.www_access group by 1"
>>> q = td.presto(sql)
>>> q.show()
2019-06-13 20:09:13.245Z  info [TDPrestoJDBCRDD]  - (TDPrestoRelation.scala:106)
Submit Presto query:
select code, count(*) cnt from sample_datasets.www_access group by 1
+----+----+
|code| cnt|
+----+----+
| 200|4981|
| 500|   2|
| 404|  17|
+----+----+
execute_presto(sql, database=None)

Run non query statements (e.g., INSERT INTO, CREATE TABLE)

Parameters:
  • sql (str) – A SQL to be executed.

  • database (str, optional) – Target database name.

Example:

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.execute_presto("CREATE TABLE IF NOT EXISTS A(time bigint, id varchar)")
table(table)

Fetch TreasureData table

Parameters:

table (str) – Table name

Returns:

TD table data. df() method should be called to treat as spark.sql.DataFrame.

Return type:

TDTable

Example:

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.table("sample_datasets.www_access")
<td_pyspark.td_pyspark.TDTable object at 0x10eedf240>
db(name)

Fetch TreasureData database

Parameters:

name (str) – Database name

Returns:

TD database data. df() method should be called to treat as spark.sql.DataFrame.

Return type:

TDDatabase

Example:

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.db("sample_datasets")
<td_pyspark.td_pyspark.TDDatabase object at 0x10eedfa58>
set_log_level(log_level)

Set log level for Spark

Parameters:

log_level (str) – Log level for Spark process. {“ALL”, “DEBUG”, “ERROR”, “FATAL”, “INFO”, “OFF”, “TRACE”, “WARN”}

Example:

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.set_log_level("DEBUG")
2019-09-06T18:19:12.398-0700  info [TDSparkContext] Setting the log level of com.treasuredata.spark to DEBUG - (TDSparkContext.scala:62)
use(name)

Change current database

Parameters:

name (str) – Target database name to be changed.

Example:

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.use("mydb")
2019-09-06T18:19:49.469-0700  info [TDSparkContext] Use mydb - (TDSparkContext.scala:150)
with_apikey(apikey)

Set an additional apikey

Parameters:

apikey (str) – apikey for TreasureData

Example:

>>> from td_pyspark import TDSparkContext
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.master("local").getOrCreate()
>>> td = TDSparkContext(spark)
>>> td2 = td.with_apikey("key2")
write(df, table_name, mode='error')

Write a DataFrame as a TreasureData table

Parameters:
  • df – Target DataFrame to be ingested to TreasureData.

  • table_name – Target table name to be inserted.

  • mode

    Save mode same as Spark. {“error”. “overwrite”, “append”, “ignore”}

    • error: raise an exception.

    • overwrite: drop the existing table, recreate it, and insert data.

    • append: insert data. Create if does not exist.

    • ignore: do nothing.

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import pandas as pd
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]}))
>>> td.write(df, "mydb.table1", "error")
insert_into(df, table_name)

Insert a DataFrame into existing TreasureData table

Parameters:
  • df (pyspark.sql.DataFrame) – Target DataFrame to be ingested to TreasureData.

  • table_name (str) – Target table name to be inserted.

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import pandas as pd
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]}))
>>> td.insert_into(df, "mydb.table1")
2019-09-09T10:57:37.558-0700  info [TDWriter] Uploading data to mydb.table1 (mode: Append) - (TDWriter.scala:66)
2019-09-09T10:57:38.187-0700  info [TDWriter] [txx:8184891a] Starting a new transaction for updating mydb.table1 - (TDWriter.scala:95)
2019-09-09T10:57:42.897-0700  info [TDWriter] [txx:8184891a] Finished uploading 1 partitions (1 records, size:132B) to mydb.table1 - (TDWriter.scala:132)
create_or_replace(df, table_name)

Create or replace a TreasureData table wity a DataFrame

Parameters:
  • df (pyspark.sql.DataFrame) – Target DataFrame to be ingested to TreasureData.

  • table_name (str) – Target table name to be ingested.

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]}))
>>> td.create_or_replace(df, "mydb.table1")
2019-09-09T10:57:56.381-0700  warn [DefaultSource] Dropping mydb.table1 (Overwrite mode) - (DefaultSource.scala:94)
2019-09-09T10:57:56.923-0700  info [TDWriter] Uploading data to mydb.table1 (mode: Overwrite) - (TDWriter.scala:66)
2019-09-09T10:57:57.106-0700  info [TDWriter] [txx:a69bce97] Starting a new transaction for updating aki.tds_test - (TDWriter.scala:95)
2019-09-09T10:57:59.179-0700  info [TDWriter] [txx:a69bce97] Finished uploading 1 partitions (1 records, size:132B) to aki.tds_test - (TDWriter.scala:132)
create_table_if_not_exists(table_name)

Create a table if not exists

Parameters:

table_name (str) – Target table name to be created.

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.create_table_if_not_exists(df, "mydb.table1")
2019-09-09T13:43:41.142-0700  warn [TDTable] Creating table aki.tds_test if not exists - (TDTable.scala:67)
drop_table_if_exists(table_name)

Drop a table if exists

Parameters:

table_name (str) – Target table name to be dropped.

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.drop_table_if_exists(df, "mydb.table1")
create_database_if_not_exists(db_name)

Create a database if not exits

Parameters:

db_name (str) – Target database name to be created.

Example:

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.create_database_if_not_exists(df, "mydb")
drop_database_if_exists(db_name)

Drop a database if exists

Parameters:

db_name (str) – Target database name to be dropped

Example:

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.drop_database_if_exists(df, "mydb")
create_udp_l(table_name, long_column_name)

Create an User-Defined Partition Table partitioned by Long type column

User-defined partitioning (UDP) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values). This method is for creating a UDP table partitioned by Long type column.

Parameters:
  • table_name (str) – Target table name to be created as a UDP table.

  • long_column_name (str) – Partition column with Long (bigint) type column

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.create_udp_l("mydb.departments", "dept_id")
2019-09-09T10:43:20.913-0700  info [UDP]  - (UDP.scala:41)
Preparing UDP table:
-- td-spark: UDP creation
create table if not exists "mydb"."departments" (
  time bigint,
  "dept_id" bigint
)
with (
  bucketed_on = array['dept_id'],
  bucket_count = 512
)
create_udp_s(table_name, string_column_name)

Create an User-Defined Partition Table partitioned by string type column

User-defined partitioning (UDP) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values). This method is for creating a UDP table partitioned by string type column.

Parameters:
  • table_name – Target table name to be created as a UDP table.

  • string_column_name – Partition column with string type column

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.create_udp_s("mydb.user_list", "id")
2019-09-09T10:45:27.802-0700  info [UDP]  - (UDP.scala:41)
Preparing UDP table:
-- td-spark: UDP creation
create table if not exists "mydb"."user_list" (
  time bigint,
  "id" varchar
)
with (
  bucketed_on = array['id'],
  bucket_count = 512
)
swap_tables(table1, table2)

Swap table contents within the same database.

Parameters:
  • table1

  • table2

Example:

>>> td.swap_tables("mydb.tbl1", "mydb.tbl2")

TDSparkContextBuilder

class td_pyspark.td_pyspark.TDSparkContextBuilder(builder)

Util method to set TD-specific configuration and validation

ENDPOINTS = ['us', 'jp', 'eu01', 'ap02']
classmethod default_jar_path()

Returns default td-spark jar path in the package

Returns:

Default td-spark jar path inlcluded in td-pyspark package.

Return type:

str

Example:

>>> TDSparkContextBuilder.default_jar_path()
'/usr/local/lib/python3.7/site-packages/td_pyspark/jars/td-spark-assembly.jar'
__init__(builder)
Parameters:

builder (pyspark.sql.SparkSession.Builder) – A builder for SparkSession.

apikey(apikey)

Set apikey for td-spark

Parameters:

apikey (str) – apikey for TreasureData

Returns:

self

api_endpoint(endpoint)

Set configuration of API host for TreasureData

Parameters:

endpoint (str) – API host name for TreasureData

Returns:

self

presto_endpoint(endpoint)

Set configuration of Presto API host

Parameters:

endpoint (str) – Presto API host name for TreasureData

Returns:

self

plazma_endpoint(endpoint)

Set configuration for Plazma API host

Parameters:

endpoint (str) – Plazma API host name for TreasureData

Returns:

self

site(siteName)

Set td-spark site to use

Parameters:

siteName (str) – “us”, “jp”, “eu01”, or “ap02”

Returns:

self

jars(jar_path)

Set spark.jars

Parameters:

jar_path (str) – Comma-separated list of jar file paths. Globs are allowed

Returns:

self

build()

Build TDSparkContext

TDDatabase

class td_pyspark.td_pyspark.TDDatabase(db, sc, sqlContext)

An class represents a database on Treasure Data.

__init__(db, sc, sqlContext)
Parameters:
  • db – A database object of td-spark

  • sc (pyspark.SparkContext) – PySpark SparkContext

  • sqlContext (pyspark.sql.SQLContext) – PySpark SQLContext

exists()

Check database existence

Returns:

Existence of the database

Return type:

bool

Example:

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> db1 = td.db("mydb")
>>> db1.exists()
True
create_if_not_exists()

Create a database if not exists

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> db1 = td.db("mydbtest")
>>> db1.exists()
False
>>> db1.create_if_not_exists()
2019-09-09T10:27:10.710-0700  warn [TDDatabase] Creating database mydbtest if not exists - (TDDatabase.scala:38)
drop_if_exists()

Drop a database if exists

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> db1 = td.db("mydbtest")
>>> db1.exists()
True
>>> db1.drop_if_exists()
2019-09-09T10:27:10.710-0700  warn [TDDatabase] Dropping database mydbtest if not exists - (TDDatabase.scala:38)
table(table)

Fetch the target TD table

Parameters:

table (str) – Target table name.

Returns:

Target TDTable

Return type:

TDTable

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> db1 = td.db("sample_datasets")
>>> db1.table("www_access")
<td_pyspark.td_pyspark.TDTable at 0x10a10c400>

TDTable

class td_pyspark.td_pyspark.TDTable(table, sc, sqlContext)

A class represents a table of Treasure Data

__init__(table, sc, sqlContext)
Parameters:
  • table – A table object of td-spark

  • sc (pyspark.SparkContext) – PySpark SparkContext

  • sqlContext (pyspark.sql.SQLContext) – PySpark SQLContext

within(duration, timezone=None)

Filter a table with time range like TD_INTERVAL

Parameters:

duration (str) – A string to specify a target time range (e.g., “-1h”). For detailed syntax, see also TD_INTERVAL function in Presto.

Returns:

A TD table filtered by duration

Return type:

TDTable

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
# to read the last 1 hour range of data
>>> td.table("tbl").within("-1h").df()
# to read the last 1 day range of data
>>> td.table("tbl").within("-1d").df()
# to read the last 1 day range of data in JST timezone
>>> td.table("tbl").within("-1d", "Asia/Tokyo")
# to read the last day's data beginning from 7 days ago
>>> td.table("tbl").within("-1d/-7d").df()
# to set specific time range
>>> td.table("sample_datasets.www_access").within("2014-10-04/2014-10-05").df()
drop_if_exists()

Drop a table if exists

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.table("tbl").drop_if_exists()
create_if_not_exists()

Create a table if not exists

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.table("tbl").create_if_not_exists()
swap_table_with(target_table)

Swap the contents of tables

Parameters:

target_table – A target table name. This target table must be in the same database.

Example:

>>> td.table("mydb.tbl1").swap_table_with("tbl2")
exists()

Check the existence of the table

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.table("tbl").exists()
True
within_unixtime_range(from_unixtime, to_unixtime)

Filter a table with unix time range

This method filter the table [from_unixtime, to_unixtime).

Parameters:
  • from_unixtime (int) – Beginning unix time of the range, which is included.

  • to_unixtime (int) – End unix time of the range, which isn’t included.

Returns:

A filtered table by Unix time range

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> t1 = td.table("sample_datasets.www_access")
>>> t1.within_unixtime_range(1412320845, 1412321000)
within_utc_time_range(from_string, to_string)

Filter a table with time range of UTC Time Zone

This method filter the table [from_string, to_string).

Parameters:
  • from_string (str) – Beginning of the range with yyyy-MM-dd HH:mm:ss format e.g. “2014-10-03 09:12:00”

  • to_string (str) – End of the range with yyyy-MM-dd HH:mm:ss format.

Returns:

A filtered table by UTC time range

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> t1 = td.table("sample_datasets.www_access")
>>> t1.within_utc_time_range("2014-10-03 09:12:00", "2014-10-03 09:13:00")
within_time_range(from_string, to_string, timezone=None)

Filter a table with time range with specified Time Zone

This method filter the table [from_string, to_string).

Parameters:
  • from_string (str) – Beginning of the range with yyyy-MM-dd HH:mm:ss format e.g. “2014-10-03 09:12:00”

  • to_string (str) – End of the range with yyyy-MM-dd HH:mm:ss format.

  • timezone (str) – Time Zone ID string which is passed to java.time.ZoneId.of() e.g. “Asia/Tokyo”

Returns:

A filtered table by UTC time range

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> t1 = td.table("sample_datasets.www_access")
>>> t1.within_time_range("2014-10-03 18:12:00", "2014-10-03 18:13:00", "Asia/Tokyo")
df()

Convert table into a Spark DataFrame

Returns:

A Spark DataFrame represents a TDTable

Return type:

pyspark.sql.DataFrame

Example:

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.table("sample_datasets.www_access").df()