import request from 'request';
import qs from 'querystring';
import merge from 'merge';
/**
* Treasure Data REST API Client
* @author jwyuan and lewuathe
* @license Apache-2.0
* @version 0.3.0
* @requires request
* @requires querystring
* @requires merge
* @see http://docs.treasuredata.com/articles/rest-api-node-client
*
* @example
* var TD = require('td');
* var client = new TD('TREASURE_DATA_API_KEY');
*
* var fnPrint = function(err, results) {
* console.log(results);
* };
* client.listDatabase(function(err, results) {
* for (var i = 0; i < results.databases.length; i++) {
* client.listTables(results.databases[i].name, fnPrint);
* }
* });
*
* @constructor
* @param {string} apikey - The API key available from user account.
* @param {object} options - Specify the endpoint of TreasureData api.
* options.host = 'api.treasuredata.com'
* options.protocol = 'http'
*/
export default class TDClient {
constructor(apikey, options) {
this.options = options || {};
this.options.apikey = apikey;
this.options.host = this.options.host || 'api.treasuredata.com';
this.options.protocol = this.options.protocol || 'http';
this.options.headers = this.options.headers || {};
this.baseUrl = `${this.options.protocol}://${this.options.host}`;
}
/**
* Return the list of all databases belongs to given account
* @param {function} callback - Callback function which receives
* error object and results object
* @example
* // Results object
* {name: 'db1', count: 1, created_at: 'XXX',
* updated_at: 'YYY', organization: null,
* permission: 'administrator'}
*/
listDatabases(callback) {
this._request("/v3/database/list", {
method: 'GET',
json: true
}, callback);
}
/**
* Delete the given named database
* @param {string} db - The name of database
* @param {function} callback - Callback function which receives
* error object and results object
*/
deleteDatabase(db, callback) {
this._request("/v3/database/delete/" + qs.escape(db), {
method: 'POST',
json: true
}, callback);
}
/**
* Create the given named database
* @param {string} db - The name of database
* @param {function} callback - Callback function which receives
* error object and results object
*/
createDatabase(db, callback) {
this._request("/v3/database/create/" + qs.escape(db), {
method: 'POST',
json: true
}, callback);
}
/**
* Return the list of all tables belongs to given database
* @param {string} db - The name of database
* @param {function} callback - Callback function which receives
* error object and results object
*/
listTables(db, callback) {
this._request("/v3/table/list/" + qs.escape(db), {
json: true
}, callback);
}
/**
* Create log type table in the given database
* @param {string} db - The name of database
* @param {string} table - The name of table
* @param {function} callback - Callback function which receives
* error object and results object
*/
createLogTable(db, table, callback) {
this.createTable(db, table, 'log', callback);
}
/**
* Create item type table in the given database
* @deprecated
* @param {string} db - The name of database
* @param {string} table - The name of table
* @param {function} callback - Callback function which receives
* error object and results object
*/
createItemTable(db, table, callback) {
this.createTable(db, table, 'item', callback);
}
/**
* Create table in given database
* @param {string} db - The name of database
* @param {string} table - The name of table
* @param {string} type - The type of table ('log' or 'item')
* @param {function} callback - Callback function which receives
* error object and results object
*/
createTable(db, table, type, callback) {
this._request("/v3/table/create/" + qs.escape(db) + "/" + qs.escape(table) + "/" + qs.escape(type), {
method: 'POST',
json: true
}, callback);
}
/**
* Swap the content of two tables
* @param {string} db - The name of database
* @param {string} table1 - The first table
* @param {string} table2 - The second table
* @param {function} callback - Callback function which receives
* error object and results object
*/
swapTable(db, table1, table2, callback) {
this._request("/v3/table/swap/" + qs.escape(db) + "/" + qs.escape(table1) + "/" + qs.escape(table2), {
method: 'POST',
json: true
}, callback);
}
updateSchema(db, table, schema_json, callback) {
this._request("/v3/table/update-schema/" + qs.escape(db) + "/" + qs.escape(table), {
method: 'POST',
body: schema_json,
json: true
}, callback);
}
deleteTable(db, table, callback) {
this._request("/v3/table/delete/" + qs.escape(db) + "/" + qs.escape(table), {
method: 'POST',
json: true
}, callback);
}
tail(db, table, count, to, from, callback) {
if (typeof count === 'function') {
callback = count;
count = null;
} else if (typeof to === 'function') {
callback = to;
to = null;
} else if (typeof from === 'function') {
callback = from;
from = null;
}
let params = {
// format: 'msgpack'
};
if (count) {
params.count = count;
}
if (to) {
params.to = to;
}
if (from) {
params.from = from;
}
this._request("/v3/table/tail/" + qs.escape(db) + "/" + qs.escape(table), {
method: 'GET',
qs: params
}, callback);
}
/**
* Return the list of all jobs run by your account
* @param {string} from - The start of the range of list
* @param {string} to - The end of the range of list
* @param {string} status - The status of returned jobs
* @param {function} callback - Callback function which receives
* error object and results object
*
*/
listJobs(from, to, status, conditions, callback) {
if (typeof from === 'function') {
callback = from;
from = 0;
} else if (typeof to === 'function') {
callback = to;
to = null;
} else if (typeof status === 'function') {
callback = status;
status = null;
} else if (typeof conditions === 'function') {
callback = conditions;
conditions = null;
}
let params = {
from: from
};
if (to) {
params.to = to;
}
if (status) {
params.status = status;
}
if (conditions) {
params.conditions = conditions;
}
this._request("/v3/job/list/", {
qs: params,
json: true
}, callback);
}
/**
* Returns the status and logs of a given job
* @param {string} job_id - The job id
* @param {function} callback - Callback function which receives
* error object and results object
*
*/
showJob(job_id, callback) {
this._request("/v3/job/show/" + qs.escape(job_id), {
json: true
}, callback);
}
/**
* Returns the result of a specific job.
* @param {string} job_id - The job id
* @param {string} format - Format to receive data back in, defaults
* to tsv
* @param {function} callback - Callback function which receives
* error object and results object
*/
jobResult(job_id, format, callback) {
let opts = {
method: 'GET',
qs: { format: 'tsv' }
};
if (typeof format === 'function') {
callback = format;
}else{
opts.qs.format = format;
}
this._request("/v3/job/result/" + qs.escape(job_id), opts, callback);
}
/**
* Kill the currently running job
* @param {string} job_id - the job id
* @param {function} callback - Callback function which receives
* error object and results object
*/
kill(job_id, callback) {
this._request("/v3/job/kill/" + qs.escape(job_id), {
method: 'POST',
json: true
}, callback);
}
/**
* Submit Hive type job
* @param {string} db - The name of database
* @param {string} query - The Hive query which run on given database
* @param {object} opts - Supported options are `result`,
* `priority` and `retry_limit`
* @param {function} callback - Callback function which receives
* error object and results object
*
* @see {@link https://docs.treasuredata.com/categories/hive}
*/
hiveQuery(db, query, opts, callback) {
if (typeof opts === 'function') {
callback = opts;
opts = {};
}
this._query(db, 'hive', query, opts, callback)
}
/**
* Submit Presto type job
* @param {string} db - The name of database
* @param {string} query - The Presto query which run on given database
* @param {object} opts - Supported options are `result`,
* `priority` and `retry_limit`
* @param {function} callback - Callback function which receives
* error object and results object
*
* @see {@link https://docs.treasuredata.com/categories/presto}
*/
prestoQuery(db, query, opts, callback) {
if (typeof opts === 'function') {
callback = opts;
opts = {};
}
this._query(db, 'presto', query, opts, callback)
}
// Export API
export(db, table, storage_type, opts, callback) {
if (typeof opts === 'function') {
callback = opts;
opts = {};
}
opts.storage_type = storage_type;
this._request("/v3/export/run/" + qs.escape(db) + "/" + qs.escape(table), {
method: 'POST',
body: opts,
json: true
}, callback);
}
/**
* Create scheduled job
*
* @param {string} name - The name of scheduled job
* @param {object} opts - Supported options are `cron` and `query`.
* @param {function} callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/categories/scheduled-job}
*
*/
createSchedule(name, opts, callback) {
if (!opts.cron || !opts.query) {
return callback(new Error('opts.cron and opts.query is required!'), {});
}
opts.type = 'hive';
this._request("/v3/schedule/create/" + qs.escape(name), {
method: 'POST',
body: opts,
json: true
}, callback);
}
/**
* Delete scheduled job
*
* @param {string} name - The name of scheduled job
* @param {function} callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/categories/scheduled-job}
*
*/
deleteSchedule(name, callback) {
this._request("/v3/schedule/delete/" + qs.escape(name), {
method: 'POST',
json: true
}, callback);
}
/**
* Show the list of scheduled jobs
*
* @param {function} callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/categories/scheduled-job}
*
*/
listSchedules(callback) {
this._request("/v3/schedule/list", {
method: 'GET',
json: true
}, callback);
}
/**
* Update the scheduled job
*
* @param {string} name - The name of scheduled job
* @param {object} params - Updated content
* @param {function} callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/categories/scheduled-job}
*/
updateSchedule(name, params, callback) {
this._request("/v3/schedule/update/" + qs.escape(name), {
body: params,
method: 'POST',
json: true
}, callback);
}
history(name, from, to, callback) {
if (typeof from === 'function') {
callback = from;
from = 0;
} else if (typeof from === 'function') {
callback = to;
to = null;
}
let params = {};
if (from) {
params.from = from;
}
if (to) {
params.to = to;
}
this._request("/v3/schedule/history/" + qs.escape(name), {
method: 'GET',
qs: params,
json: true
}, callback);
}
runSchedule(name, time, num, callback) {
if (typeof num === 'function') {
callback = num;
num = null;
}
let params = {};
if (num) {
params.num = num;
}
this._request("/v3/schedule/run/" + qs.escape(name) + "/" + qs.escape(time), {
method: 'POST',
qs: params,
json: true
}, callback);
}
// Import API
import(db, table, format, stream, size, unique_id, callback) {
if (typeof unique_id === 'function') {
callback = unique_id;
unique_id = null;
}
let path;
if (unique_id) {
path = "/v3/table/import_with_id/" + qs.escape(db) + "/" + qs.escape(table) + "/" + qs.escape(unique_id) + "/" + qs.escape(format);
} else {
path = "/v3/table/import/" + qs.escape(db) + "/" + qs.escape(table) + "/" + qs.escape(format);
}
this._put(path, {
method: 'PUT',
headers: {
'Content-Type': 'application/octet-stream',
'Content-Length': size
}
}, stream, callback);
}
// Result API
listResult(callback) {
this._request("/v3/result/list", {
json: true
}, callback);
}
createResult(name, url, callback) {
this._request("/v3/result/create/" + qs.escape(name), {
method: 'POST',
body: { 'url': url },
json: true
}, callback);
}
deleteResult(name, url, callback) {
this._request("/v3/result/delete/" + qs.escape(name), {
method: 'POST',
body: { 'url': url },
json: true
}, callback);
}
// Server Status API
serverStatus(callback) {
this._request("/v3/system/server_status", {
json: true
}, callback);
}
// Bulk import APIs
/**
* Create bulk import session
* @param {string} name - The session name
* @param {string } db - Database name where data is imported
* @param {string } table - Table name where data is imported
* @param {function} callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/articles/bulk-import}
*/
createBulkImport(name, db, table, callback) {
this._request('/v3/bulk_import/create/' + qs.escape(name) + '/' + qs.escape(db) + '/' + qs.escape(table), {
method: 'POST',
json: true
}, callback);
}
/**
* Delete bulk import session
* @param {string} name - The session name
* @param {function} callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/articles/bulk-import}
*/
deleteBulkImport(name, callback) {
this._request('/v3/bulk_import/delete/' + qs.escape(name), {
method: 'POST',
json: true
}, callback);
}
/**
* Show the information about specified bulk import session
* @param {string} name - The session name
* @param {function} callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/articles/bulk-import}
*/
showBulkImport(name, callback) {
this._request('/v3/bulk_import/show/' + qs.escape(name), {
method: 'GET',
json: true
}, callback);
}
/**
* Show the list of all bulk import sessions
* @param {function} callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/articles/bulk-import}
*/
listBulkImports(callback) {
this._request('/v3/bulk_import/list', {
method: 'GET',
json: true
}, callback);
}
/**
* Show the list of all partitions of specified bulk import session
* @param {function} callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/articles/bulk-import}
*/
listBulkImportParts(name, callback) {
this._request('/v3/bulk_import/list_parts/' + qs.escape(name), {
method: 'GET',
json: true
}, callback);
}
/**
* Upload a partition file for the specified bulk import session
* @param {string} name - The bulk import session name
* @param {string} partName - The partition name
* @param {stream.Readable} stream - Readable stream that reads partition file
* @param {function} callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/articles/bulk-import}
* @example
* var stream = fs.createReadStream('part_file.msgpack.gz');
* client.bulkImportUploadPart('bulk_import_session', 'part1', stream, function(err, results) {
* // Obtain bulk import uploading result
* });
*/
bulkImportUploadPart(name, partName, stream, callback) {
this._put('/v3/bulk_import/upload_part/' + qs.escape(name) + '/' + qs.escape(partName),
{json: true}, stream, callback);
}
/**
* Delete specified partition from given bulk import session
* @param {string} name - The bulk import session name
* @param {string} partName - The partition name
* @param {function} callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/articles/bulk-import}
*/
bulkImportDeletePart(name, partName, callback) {
this._request('/v3/bulk_import/delete_part/' + qs.escape(name) + '/' + qs.escape(partName), {
method: 'POST',
json: true
}, callback);
}
/**
* Run the job for processing partition files inside Treasure Data service
* @param {string} name - The bulk import session name
* @param callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/articles/bulk-import}
*/
performBulkImport(name, callback) {
this._request('/v3/bulk_import/perform/' + qs.escape(name), {
method: 'POST',
json: true
}, callback)
}
/**
* Confirm the bulk import session is finished successfully
* @param {string} name - The bulk import session name
* @param callback - Callback function which receives
* error object and results object
* @see {@link https://docs.treasuredata.com/articles/bulk-import}
*/
commitBulkImport(name, callback) {
this._request('/v3/bulk_import/commit/' + qs.escape(name), {
method: 'POST',
json: true
}, callback)
}
/**
* _query: Protected method
* @protected
*/
_query(db, query_type, q, opts, callback) {
opts.query = q;
this._request("/v3/job/issue/" + query_type + "/" + qs.escape(db), {
method: 'POST',
body: opts,
json: true
}, callback);
}
/**
* _request: Protected method
* @protected
*/
_request(path, options, callback) {
if (typeof options === 'function') {
callback = options;
options = {};
}
options.uri = this.baseUrl + path;
options.headers = { 'Authorization': 'TD1 ' + this.options.apikey };
// Merge custom headers
options.headers = merge(options.headers, this.options.headers);
callback = callback;
request(options, (err, res, body) => {
if (err) { return callback(err, undefined); }
if (res.statusCode < 200 || res.statusCode > 299) {
return callback(new Error((body && body.error) || 'HTTP ' + res.statusCode),
body || {});
}
callback(null, body || {});
});
}
/**
* _put: Protected method
* @protected
*/
_put(path, options, stream, callback) {
options.uri = this.baseUrl + path;
options.headers = { 'Authorization': 'TD1 ' + this.options.apikey };
stream.pipe(request.put(options, (err, res, body) => {
if (err) { return callback(err, undefined); }
if (res.statusCode < 200 || res.statusCode > 299) {
return callback(new Error((body && body.error) || 'HTTP ' + res.statusCode),
body || {});
}
callback(null, body || {});
}));
}
}