From bd526d221617a8693bc9c689c959147b5bf986e2 Mon Sep 17 00:00:00 2001 From: StoneT2000 Date: Fri, 2 Aug 2019 19:32:33 -0700 Subject: [PATCH] Node.js Connector updated to 1.3.0 - Subscription, Continuous Query, Bug Fixes Subscription implemented Continuous Query implemented Fixed bugs when dealing with null values retrieved from tables Removed unused code Added basic performance test code --- src/connector/nodejs/nodetaos/cinterface.js | 234 ++++++++++++++++++-- src/connector/nodejs/nodetaos/constants.js | 2 +- src/connector/nodejs/nodetaos/cursor.js | 170 ++++++++++---- src/connector/nodejs/nodetaos/taosresult.js | 12 +- src/connector/nodejs/package-lock.json | 2 +- src/connector/nodejs/package.json | 2 +- src/connector/nodejs/readme.md | 5 +- src/connector/nodejs/test/performance.js | 89 ++++++++ src/connector/nodejs/test/test.js | 78 ++++++- 9 files changed, 506 insertions(+), 88 deletions(-) create mode 100644 src/connector/nodejs/test/performance.js diff --git a/src/connector/nodejs/nodetaos/cinterface.js b/src/connector/nodejs/nodetaos/cinterface.js index 09ba9cca88..28bc9817c5 100644 --- a/src/connector/nodejs/nodetaos/cinterface.js +++ b/src/connector/nodejs/nodetaos/cinterface.js @@ -52,9 +52,12 @@ function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { if (data[i] == 0) { res[i] = false; } - else { + else if (data[i] == 1){ res[i] = true; } + else if (data[i] == FieldTypes.C_BOOL_NULL) { + res[i] = null; + } } return res; } @@ -63,7 +66,8 @@ function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) let res = []; let currOffset = 0; while (currOffset < data.length) { - res.push(data.readIntLE(currOffset,1)); + let d = data.readIntLE(currOffset,1); + res.push(d == FieldTypes.C_TINYINT_NULL ? null : d); currOffset += nbytes; } return res; @@ -73,7 +77,8 @@ function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) let res = []; let currOffset = 0; while (currOffset < data.length) { - res.push(data.readIntLE(currOffset,2)); + let d = data.readIntLE(currOffset,2); + res.push(d == FieldTypes.C_SMALLINT_NULL ? null : d); currOffset += nbytes; } return res; @@ -83,7 +88,8 @@ function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { let res = []; let currOffset = 0; while (currOffset < data.length) { - res.push(data.readInt32LE(currOffset)); + let d = data.readInt32LE(currOffset); + res.push(d == FieldTypes.C_INT_NULL ? null : d); currOffset += nbytes; } return res; @@ -93,7 +99,8 @@ function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { let res = []; let currOffset = 0; while (currOffset < data.length) { - res.push(BigInt(data.readInt64LE(currOffset))); + let d = data.readInt64LE(currOffset); + res.push(d == FieldTypes.C_BIGINT_NULL ? null : BigInt(d)); currOffset += nbytes; } return res; @@ -103,7 +110,8 @@ function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { let res = []; let currOffset = 0; while (currOffset < data.length) { - res.push(parseFloat(data.readFloatLE(currOffset).toFixed(7))); + let d = parseFloat(data.readFloatLE(currOffset).toFixed(5)); + res.push(isNaN(d) ? null : d); currOffset += nbytes; } return res; @@ -113,7 +121,8 @@ function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { let res = []; let currOffset = 0; while (currOffset < data.length) { - res.push(parseFloat(data.readDoubleLE(currOffset).toFixed(16))); + let d = parseFloat(data.readDoubleLE(currOffset).toFixed(16)); + res.push(isNaN(d) ? null : d); currOffset += nbytes; } return res; @@ -123,8 +132,13 @@ function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { let res = []; let currOffset = 0; while (currOffset < data.length) { - let dataEntry = data.slice(currOffset, currOffset + nbytes); //one entry in a row under a column; - res.push(ref.readCString(dataEntry)); + let dataEntry = data.slice(currOffset, currOffset + nbytes); + if (dataEntry[0] == FieldTypes.C_BINARY_NULL) { + res.push(null); + } + else { + res.push(ref.readCString(dataEntry)); + } currOffset += nbytes; } return res; @@ -133,10 +147,15 @@ function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, micro=false) { data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset); let res = []; let currOffset = 0; - //every 4; + // every 4 bytes, a character is encoded; while (currOffset < data.length) { let dataEntry = data.slice(currOffset, currOffset + nbytes); //one entry in a row under a column; - res.push(dataEntry.toString("utf16le").replace(/\u0000/g, "")); + if (dataEntry.readInt64LE(0) == FieldTypes.C_NCHAR_NULL) { + res.push(null); + } + else { + res.push(dataEntry.toString("utf16le").replace(/\u0000/g, "")); + } currOffset += nbytes; } return res; @@ -178,7 +197,7 @@ function CTaosInterface (config = null, pass = false) { ref.types.void_ptr = ref.refType(ref.types.void); ref.types.void_ptr2 = ref.refType(ref.types.void_ptr); /*Declare a bunch of functions first*/ - /* Note, pointers to TAOS_RES, TAOS, are ref.types.void_ptr. The connection._conn buffer is supplied for pointers to TAOS */ + /* Note, pointers to TAOS_RES, TAOS, are ref.types.void_ptr. The connection._conn buffer is supplied for pointers to TAOS * */ this.libtaos = ffi.Library('libtaos', { 'taos_options': [ ref.types.int, [ ref.types.int , ref.types.void_ptr ] ], 'taos_init': [ ref.types.void, [ ] ], @@ -211,16 +230,42 @@ function CTaosInterface (config = null, pass = false) { 'taos_errno': [ ref.types.int, [ ref.types.void_ptr] ], //char *taos_errstr(TAOS *taos) 'taos_errstr': [ ref.types.char, [ ref.types.void_ptr] ], + //void taos_stop_query(TAOS_RES *res); + 'taos_stop_query': [ ref.types.void, [ ref.types.void_ptr] ], + //char *taos_get_server_info(TAOS *taos); + 'taos_get_server_info': [ ref.types.char_ptr, [ ref.types.void_ptr ] ], + //char *taos_get_client_info(); + 'taos_get_client_info': [ ref.types.char_ptr, [ ] ], // ASYNC // void taos_query_a(TAOS *taos, char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) 'taos_query_a': [ ref.types.void, [ ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.void_ptr ] ], // void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param); - 'taos_fetch_rows_a': [ ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.void_ptr ]] + 'taos_fetch_rows_a': [ ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.void_ptr ]], + + // Subscription + //TAOS_SUB *taos_subscribe(char *host, char *user, char *pass, char *db, char *table, long time, int mseconds) + ////TAOS_SUB *taos_subscribe(char *host, char *user, char *pass, char *db, char *table, int64_t time, int mseconds); + 'taos_subscribe': [ ref.types.void_ptr, [ ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.char_ptr, ref.types.int64, ref.types.int] ], + //TAOS_ROW taos_consume(TAOS_SUB *tsub); + 'taos_consume': [ ref.refType(ref.types.void_ptr2), [ref.types.void_ptr] ], + //void taos_unsubscribe(TAOS_SUB *tsub); + 'taos_unsubscribe': [ ref.types.void, [ ref.types.void_ptr ] ], + //int taos_subfields_count(TAOS_SUB *tsub); + 'taos_subfields_count': [ ref.types.int, [ref.types.void_ptr ] ], + //TAOS_FIELD *taos_fetch_subfields(TAOS_SUB *tsub); + 'taos_fetch_subfields': [ ref.refType(TaosField), [ ref.types.void_ptr ] ], + + // Continuous Query + //TAOS_STREAM *taos_open_stream(TAOS *taos, char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), + // int64_t stime, void *param, void (*callback)(void *)); + 'taos_open_stream': [ ref.types.void_ptr, [ ref.types.void_ptr, ref.types.char_ptr, ref.types.void_ptr, ref.types.int64, ref.types.void_ptr, ref.types.void_ptr ] ], + //void taos_close_stream(TAOS_STREAM *tstr); + 'taos_close_stream': [ ref.types.void, [ ref.types.void_ptr ] ] + }); if (pass == false) { if (config == null) { - //check this buffer this._config = ref.alloc(ref.types.char_ptr, ref.NULL); } else { @@ -343,7 +388,7 @@ CTaosInterface.prototype.freeResult = function freeResult(result) { CTaosInterface.prototype.numFields = function numFields(result) { return this.libtaos.taos_num_fields(result); } -/** @deprecated */ +// Fetch fields count by connection, the latest query CTaosInterface.prototype.fieldsCount = function fieldsCount(connection) { return this.libtaos.taos_field_count(connection); } @@ -375,7 +420,7 @@ CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback, // Data preparation to pass to cursor. Could be bottleneck in query execution callback times. let row = cti.libtaos.taos_fetch_row(result2); let fields = cti.fetchFields_a(result2); - let isMicro = (cti.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO); + let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO); let blocks = new Array(fields.length); blocks.fill(null); numOfRows2 = Math.abs(numOfRows2); @@ -391,13 +436,12 @@ CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback, } callback(param2, result2, numOfRows2, blocks); } - asyncCallbackWrapper = ffi.Callback(ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.int], asyncCallbackWrapper); + asyncCallbackWrapper = ffi.Callback(ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.types.int ], asyncCallbackWrapper); this.libtaos.taos_fetch_rows_a(result, asyncCallbackWrapper, param); return param; } // Fetch field meta data by result handle CTaosInterface.prototype.fetchFields_a = function fetchFields_a (result) { - // let pfields = this.fetchFields(result); let pfieldscount = this.numFields(result); let fields = []; @@ -414,3 +458,157 @@ CTaosInterface.prototype.fetchFields_a = function fetchFields_a (result) { } return fields; } +// Stop a query by result handle +CTaosInterface.prototype.stopQuery = function stopQuery(result) { + if (result != null){ + this.libtaos.taos_stop_query(result); + } + else { + throw new errors.ProgrammingError("No result handle passed to stop query"); + } +} +CTaosInterface.prototype.getServerInfo = function getServerInfo(connection) { + return ref.readCString(this.libtaos.taos_get_server_info(connection)); +} +CTaosInterface.prototype.getClientInfo = function getClientInfo() { + return ref.readCString(this.libtaos.taos_get_client_info()); +} + +// Subscription +CTaosInterface.prototype.subscribe = function subscribe(host=null, user="root", password="taosdata", db=null, table=null, time=null, mseconds=null) { + let dbOrig = db; + let tableOrig = table; + try { + host = host != null ? ref.allocCString(host) : ref.alloc(ref.types.char_ptr, ref.NULL); + } + catch(err) { + throw "Attribute Error: host is expected as a str"; + } + try { + user = ref.allocCString(user) + } + catch(err) { + throw "Attribute Error: user is expected as a str"; + } + try { + password = ref.allocCString(password); + } + catch(err) { + throw "Attribute Error: password is expected as a str"; + } + try { + db = db != null ? ref.allocCString(db) : ref.alloc(ref.types.char_ptr, ref.NULL); + } + catch(err) { + throw "Attribute Error: db is expected as a str"; + } + try { + table = table != null ? ref.allocCString(table) : ref.alloc(ref.types.char_ptr, ref.NULL); + } + catch(err) { + throw TypeError("table is expected as a str"); + } + try { + mseconds = ref.alloc(ref.types.int, mseconds); + } + catch(err) { + throw TypeError("mseconds is expected as an int"); + } + //TAOS_SUB *taos_subscribe(char *host, char *user, char *pass, char *db, char *table, int64_t time, int mseconds); + let subscription = this.libtaos.taos_subscribe(host, user, password, db, table, time, mseconds); + if (ref.isNull(subscription)) { + throw new errors.TDError('Failed to subscribe to TDengine | Database: ' + dbOrig + ', Table: ' + tableOrig); + } + else { + console.log('Successfully subscribed to TDengine | Database: ' + dbOrig + ', Table: ' + tableOrig); + } + return subscription; +} +CTaosInterface.prototype.subFieldsCount = function subFieldsCount(subscription) { + return this.libtaos.taos_subfields_count(subscription); +} +CTaosInterface.prototype.fetchSubFields = function fetchSubFields(subscription) { + let pfields = this.libtaos.taos_fetch_subfields(subscription); + let pfieldscount = this.subFieldsCount(subscription); + let fields = []; + if (ref.isNull(pfields) == false) { + pfields = ref.reinterpret(pfields, 68 * pfieldscount , 0); + for (let i = 0; i < pfields.length; i += 68) { + //0 - 63 = name //64 - 65 = bytes, 66 - 67 = type + fields.push( { + name: ref.readCString(ref.reinterpret(pfields,64,i)), + bytes: pfields[i + 64], + type: pfields[i + 66] + }) + } + } + return fields; +} +CTaosInterface.prototype.consume = function consume(subscription) { + let row = this.libtaos.taos_consume(subscription); + let fields = this.fetchSubFields(subscription); + //let isMicro = (cti.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO); + let isMicro = false; //no supported function for determining precision? + let blocks = new Array(fields.length); + blocks.fill(null); + let numOfRows2 = 1; //Math.abs(numOfRows2); + let offset = 0; + if (numOfRows2 > 0){ + for (let i = 0; i < fields.length; i++) { + if (!convertFunctions[fields[i]['type']] ) { + throw new errors.DatabaseError("Invalid data type returned from database"); + } + blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, isMicro); + offset += fields[i]['bytes'] * numOfRows2; + } + } + return {blocks:blocks, fields:fields}; +} +CTaosInterface.prototype.unsubscribe = function unsubscribe(subscription) { + //void taos_unsubscribe(TAOS_SUB *tsub); + this.libtaos.taos_unsubscribe(subscription); +} + +// Continuous Query +CTaosInterface.prototype.openStream = function openStream(connection, sql, callback, stime,stoppingCallback, param = ref.ref(ref.NULL)) { + try { + sql = ref.allocCString(sql); + } + catch(err) { + throw "Attribute Error: sql string is expected as a str"; + } + var cti = this; + let asyncCallbackWrapper = function (param2, result2, row) { + let fields = cti.fetchFields_a(result2); + let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO); + let blocks = new Array(fields.length); + blocks.fill(null); + let numOfRows2 = 1; + let offset = 0; + if (numOfRows2 > 0) { + for (let i = 0; i < fields.length; i++) { + if (!convertFunctions[fields[i]['type']] ) { + throw new errors.DatabaseError("Invalid data type returned from database"); + } + blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, isMicro); + offset += fields[i]['bytes'] * numOfRows2; + } + } + callback(param2, result2, blocks, fields); + } + asyncCallbackWrapper = ffi.Callback(ref.types.void, [ ref.types.void_ptr, ref.types.void_ptr, ref.refType(ref.types.void_ptr2) ], asyncCallbackWrapper); + asyncStoppingCallbackWrapper = ffi.Callback( ref.types.void, [ ref.types.void_ptr ], stoppingCallback); + let streamHandle = this.libtaos.taos_open_stream(connection, sql, asyncCallbackWrapper, stime, param, asyncStoppingCallbackWrapper); + if (ref.isNull(streamHandle)) { + throw new errors.TDError('Failed to open a stream with TDengine'); + return false; + } + else { + console.log("Succesfully opened stream"); + return streamHandle; + } +} +CTaosInterface.prototype.closeStream = function closeStream(stream) { + this.libtaos.taos_close_stream(stream); + console.log("Closed stream"); +} diff --git a/src/connector/nodejs/nodetaos/constants.js b/src/connector/nodejs/nodetaos/constants.js index 0430f4cb41..cd6a0c9fba 100644 --- a/src/connector/nodejs/nodetaos/constants.js +++ b/src/connector/nodejs/nodetaos/constants.js @@ -38,7 +38,7 @@ module.exports = { C_NCHAR : 10, // NULL value definition // NOTE: These values should change according to C definition in tsdb.h - C_BOOL_NULL : 0x02, + C_BOOL_NULL : 2, C_TINYINT_NULL : -128, C_SMALLINT_NULL : -32768, C_INT_NULL : -2147483648, diff --git a/src/connector/nodejs/nodetaos/cursor.js b/src/connector/nodejs/nodetaos/cursor.js index d99996f44e..17b0f09864 100644 --- a/src/connector/nodejs/nodetaos/cursor.js +++ b/src/connector/nodejs/nodetaos/cursor.js @@ -7,7 +7,7 @@ const { PerformanceObserver, performance } = require('perf_hooks'); module.exports = TDengineCursor; /** - * @typedef {Object} Buffer - A Node.JS buffer. Please refer to {@link https://nodejs.org/api/buffer.html} for more details + * @typedef {Object} Buffer - A Node.js buffer. Please refer to {@link https://nodejs.org/api/buffer.html} for more details * @global */ @@ -24,27 +24,21 @@ module.exports = TDengineCursor; */ function TDengineCursor(connection=null) { //All parameters are store for sync queries only. - this._description = null; this._rowcount = -1; this._connection = null; this._result = null; this._fields = null; this.data = []; this.fields = null; - this._chandle = new CTaosInterface(null, true); //pass through, just need library loaded. if (connection != null) { this._connection = connection + this._chandle = connection._chandle //pass through, just need library loaded. + } + else { + throw new errors.ProgrammingError("A TDengineConnection object is required to be passed to the TDengineCursor"); } } -/** - * Get the description of the latest query - * @since 1.0.0 - * @return {string} Description - */ -TDengineCursor.prototype.description = function description() { - return this._description; -} /** * Get the row counts of the latest query * @since 1.0.0 @@ -53,9 +47,6 @@ TDengineCursor.prototype.description = function description() { TDengineCursor.prototype.rowcount = function rowcount() { return this._rowcount; } -TDengineCursor.prototype.callproc = function callproc() { - return; -} /** * Close the cursor by setting its connection to null and freeing results from the connection and resetting the results it has stored * @return {boolean} Whether or not the cursor was succesfully closed @@ -112,15 +103,21 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback let stmt = operation; let time = 0; - const obs = new PerformanceObserver((items) => { - time = items.getEntries()[0].duration; - performance.clearMarks(); - }); - obs.observe({ entryTypes: ['measure'] }); - performance.mark('A'); - res = this._chandle.query(this._connection._conn, stmt); - performance.mark('B'); - performance.measure('query', 'A', 'B'); + let res; + if (options['quiet'] != true) { + const obs = new PerformanceObserver((items) => { + time = items.getEntries()[0].duration; + performance.clearMarks(); + }); + obs.observe({ entryTypes: ['measure'] }); + performance.mark('A'); + res = this._chandle.query(this._connection._conn, stmt); + performance.mark('B'); + performance.measure('query', 'A', 'B'); + } + else { + res = this._chandle.query(this._connection._conn, stmt); + } if (res == 0) { let fieldCount = this._chandle.fieldsCount(this._connection._conn); @@ -139,7 +136,7 @@ TDengineCursor.prototype.execute = function execute(operation, options, callback this._fields = resAndField.fields; this.fields = resAndField.fields; wrapCB(callback); - return this._handle_result(); //return a pointer to the result + return this._result; //return a pointer to the result } } else { @@ -271,7 +268,6 @@ TDengineCursor.prototype.execute_a = function execute_a (operation, options, cal if (resCode >= 0) { let fieldCount = cr._chandle.numFields(res2); if (fieldCount == 0) { - //get affect fields count cr._chandle.freeResult(res2); //result will no longer be needed } else { @@ -280,8 +276,6 @@ TDengineCursor.prototype.execute_a = function execute_a (operation, options, cal } else { - //new errors.ProgrammingError(this._chandle.errStr(this._connection._conn)) - //how to get error by result handle? throw new errors.ProgrammingError("Error occuring with use of execute_a async function. Status code was returned with failure"); } } @@ -313,7 +307,7 @@ TDengineCursor.prototype.execute_a = function execute_a (operation, options, cal * @param {function} callback - callback function that is callbacked on the COMPLETE fetched data (it is calledback only once!). * Must be of form function (param, result, rowCount, rowData) * @param {Object} param - A parameter that is also passed to the main callback function. Important! Param must be an object, and the key "data" cannot be used - * @return {{param:Object, result:buffer}} An object with the passed parameters object and the buffer instance that is a pointer to the result handle. + * @return {{param:Object, result:Buffer}} An object with the passed parameters object and the buffer instance that is a pointer to the result handle. * @since 1.2.0 * @example * cursor.execute('select * from db.table'); @@ -377,27 +371,117 @@ TDengineCursor.prototype.fetchall_a = function fetchall_a(result, options, callb param = this._chandle.fetch_rows_a(result, asyncCallbackWrapper, buf); //returned param return {param:param,result:result}; } -TDengineCursor.prototype.nextset = function nextset() { - return; -} -TDengineCursor.prototype.setinputsize = function setinputsize() { - return; -} -TDengineCursor.prototype.setoutputsize = function setoutputsize(size, column=null) { - return; +/** + * Stop a query given the result handle. + * @param {Buffer} result - The buffer that acts as the result handle + * @since 1.3.0 + */ +TDengineCursor.prototype.stopQuery = function stopQuery(result) { + this._chandle.stopQuery(result); } TDengineCursor.prototype._reset_result = function _reset_result() { - this._description = null; this._rowcount = -1; this._result = null; this._fields = null; this.data = []; this.fields = null; } -TDengineCursor.prototype._handle_result = function _handle_result() { - this._description = []; - for (let field of this._fields) { - this._description.push([field.name, field.type]); - } - return this._result; +/** + * Get server info such as version number + * @return {string} + * @since 1.3.0 + */ +TDengineCursor.prototype.getServerInfo = function getServerInfo() { + return this._chandle.getServerInfo(this._connection._conn); } +/** + * Get client info such as version number + * @return {string} + * @since 1.3.0 + */ +TDengineCursor.prototype.getClientInfo = function getClientInfo() { + return this._chandle.getClientInfo(); +} +/** + * Subscribe to a table from a database in TDengine. + * @param {Object} config - A configuration object containing the configuration options for the subscription + * @param {string} config.host - The host to subscribe to + * @param {string} config.user - The user to subscribe as + * @param {string} config.password - The password for the said user + * @param {string} config.db - The db containing the table to subscribe to + * @param {string} config.table - The name of the table to subscribe to + * @param {number} config.time - The start time to start a subscription session + * @param {number} config.mseconds - The pulling period of the subscription session + * @return {Buffer} A buffer pointing to the subscription session handle + * @since 1.3.0 + */ +TDengineCursor.prototype.subscribe = function subscribe(config) { + return this._chandle.subscribe(config.host, config.user, config.password, config.db, config.table, config.time, config.mseconds); +}; +/** + * An infinite loop that consumes the latest data and calls a callback function that is provided. + * @param {Buffer} subscription - A buffer object pointing to the subscription session handle + * @param {function} callback - The callback function that takes the row data, field/column meta data, and the subscription session handle as input + * @since 1.3.0 + */ +TDengineCursor.prototype.consumeData = async function consumeData(subscription, callback) { + while (true) { + let res = this._chandle.consume(subscription); + let data = []; + let num_of_rows = res.blocks[0].length; + for (let j = 0; j < num_of_rows; j++) { + data.push([]); + let rowBlock = new Array(res.fields.length); + for (let k = 0; k < res.fields.length; k++) { + rowBlock[k] = res.blocks[k][j]; + } + data[data.length-1] = rowBlock; + } + callback(data, res.fields, subscription); + } +} +/** + * Unsubscribe the provided buffer object pointing to the subscription session handle + * @param {Buffer} subscription - A buffer object pointing to the subscription session handle that is to be unsubscribed + * @since 1.3.0 + */ +TDengineCursor.prototype.unsubscribe = function unsubscribe(subscription) { + this._chandle.unsubscribe(subscription); +} +/** + * Open a stream with TDengine to run the sql query periodically in the background + * @param {string} sql - The query to run + * @param {function} callback - The callback function to run after each query, accepting inputs as param, result handle, data, fields meta data + * @param {number} stime - The time of the stream starts in the form of epoch milliseconds. If 0 is given, the start time is set as the current time. + * @param {function} stoppingCallback - The callback function to run when the continuous query stops. It takes no inputs + * @param {object} param - A parameter that is passed to the main callback function + * @return {Buffer} A buffer pointing to the stream handle + * @since 1.3.0 + */ + TDengineCursor.prototype.openStream = function openStream(sql, callback, stime = 0, stoppingCallback, param = {}) { + let buf = ref.alloc('Object'); + ref.writeObject(buf, 0, param); + + let asyncCallbackWrapper = function (param2, result2, blocks, fields) { + let data = []; + let num_of_rows = blocks[0].length; + for (let j = 0; j < num_of_rows; j++) { + data.push([]); + let rowBlock = new Array(fields.length); + for (let k = 0; k < fields.length; k++) { + rowBlock[k] = blocks[k][j]; + } + data[data.length-1] = rowBlock; + } + callback(param2, result2, blocks, fields); + } + return this._chandle.openStream(this._connection._conn, sql, asyncCallbackWrapper, stime, stoppingCallback, buf); + } + /** + * Close a stream + * @param {Buffer} - A buffer pointing to the handle of the stream to be closed + * @since 1.3.0 + */ + TDengineCursor.prototype.closeStream = function closeStream(stream) { + this._chandle.closeStream(stream); + } diff --git a/src/connector/nodejs/nodetaos/taosresult.js b/src/connector/nodejs/nodetaos/taosresult.js index bfab94d359..fd82f4e236 100644 --- a/src/connector/nodejs/nodetaos/taosresult.js +++ b/src/connector/nodejs/nodetaos/taosresult.js @@ -15,24 +15,17 @@ module.exports = TaosResult; * @since 1.0.6 */ function TaosResult(data, fields) { - this.data = data.map(row => new TaosRow(row)); this.rowcount = this.data.length; this.fields = fields.map(field => new TaosField(field)); } - -TaosResult.prototype.parseFields = function parseFields(fields) { - return fields.map(function(field) { - return field; - }); -} /** * Pretty print data and the fields meta data as if you were using the taos shell * @memberof TaosResult * @function pretty + * @since 1.0.6 */ TaosResult.prototype.pretty = function pretty() { - // Pretty print of the fields and the data; let fieldsStr = ""; let sizing = []; this.fields.forEach((field,i) => { @@ -55,10 +48,9 @@ TaosResult.prototype.pretty = function pretty() { entry = entry.toTaosString(); } else { - entry = entry.toString(); + entry = entry == null ? 'null' : entry.toString(); } rowStr += entry - //console.log(this.fields[i]._field.bytes, suggestedWidths[this.fields[i]._field.type]); rowStr += fillEmpty(sizing[i] - entry.length) + " | "; }); console.log(rowStr); diff --git a/src/connector/nodejs/package-lock.json b/src/connector/nodejs/package-lock.json index 4ee92521da..e43b131104 100644 --- a/src/connector/nodejs/package-lock.json +++ b/src/connector/nodejs/package-lock.json @@ -1,6 +1,6 @@ { "name": "td-connector", - "version": "1.2.1", + "version": "1.3.0", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/src/connector/nodejs/package.json b/src/connector/nodejs/package.json index 233673e721..0f3e4fcb84 100644 --- a/src/connector/nodejs/package.json +++ b/src/connector/nodejs/package.json @@ -1,6 +1,6 @@ { "name": "td-connector", - "version": "1.2.1", + "version": "1.3.0", "description": "A Node.js connector for TDengine.", "main": "tdengine.js", "scripts": { diff --git a/src/connector/nodejs/readme.md b/src/connector/nodejs/readme.md index 627a2ed9b2..cf60b11024 100644 --- a/src/connector/nodejs/readme.md +++ b/src/connector/nodejs/readme.md @@ -130,9 +130,9 @@ console.log(cursor.data); // Latest query's result data is stored in cursor.data ### Async functionality -Async queries can be performed using the same functions such as `cursor.execute`, `cursor.query`, but now with `_a` appended to them. +Async queries can be performed using the same functions such as `cursor.execute`, `TaosQuery.query`, but now with `_a` appended to them. -Say you want to execute an two async query on two seperate tables, using `cursor.query_a`, you can do that and get a TaosQuery object, which upon executing with the `execute_a` function, returns a promise that resolves with a TaosResult object. +Say you want to execute an two async query on two separate tables, using `cursor.query`, you can do that and get a TaosQuery object, which upon executing with the `execute_a` function, returns a promise that resolves with a TaosResult object. ```javascript var promise1 = cursor.query('select count(*), avg(v1), avg(v2) from meter1;').execute_a() @@ -145,7 +145,6 @@ promise2.then(function(result) { }) ``` - ## Example An example of using the NodeJS connector to create a table with weather data and create and execute queries can be found [here](https://github.com/taosdata/TDengine/tree/master/tests/examples/nodejs/node-example.js) (The preferred method for using the connector) diff --git a/src/connector/nodejs/test/performance.js b/src/connector/nodejs/test/performance.js new file mode 100644 index 0000000000..ea197f0344 --- /dev/null +++ b/src/connector/nodejs/test/performance.js @@ -0,0 +1,89 @@ +function memoryUsageData() { + let s = process.memoryUsage() + for (key in s) { + s[key] = (s[key]/1000000).toFixed(3) + "MB"; + } + return s; +} +console.log("initial mem usage:", memoryUsageData()); + +const { PerformanceObserver, performance } = require('perf_hooks'); +const taos = require('../tdengine'); +var conn = taos.connect({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:0}); +var c1 = conn.cursor(); + +// Initialize env +c1.execute('create database if not exists td_connector_test;'); +c1.execute('use td_connector_test;') +c1.execute('create table if not exists all_types (ts timestamp, _int int, _bigint bigint, _float float, _double double, _binary binary(40), _smallint smallint, _tinyint tinyint, _bool bool, _nchar nchar(40));'); +c1.execute('create table if not exists stabletest (ts timestamp, v1 int, v2 int, v3 int, v4 double) tags (id int, location binary(20));') + + +// Insertion into single table Performance Test +var dataPrepTime = 0; +var insertTime = 0; +var insertTime5000 = 0; +var avgInsert5ktime = 0; +const obs = new PerformanceObserver((items) => { + let entry = items.getEntries()[0]; + + if (entry.name == 'Data Prep') { + dataPrepTime += entry.duration; + } + else if (entry.name == 'Insert'){ + insertTime += entry.duration + } + else { + console.log(entry.name + ': ' + (entry.duration/1000).toFixed(8) + 's'); + } + performance.clearMarks(); +}); +obs.observe({ entryTypes: ['measure'] }); + +function R(l,r) { + return Math.random() * (r - l) - r; +} +function randomBool() { + if (Math.random() < 0.5) { + return true; + } + return false; +} +function insertN(n) { + for (let i = 0; i < n; i++) { + performance.mark('A3'); + let insertData = ["now + " + i + "m", // Timestamp + parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // Int + parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // BigInt + parseFloat( R(-3.4E38, 3.4E38) ), // Float + parseFloat( R(-1.7E308, 1.7E308) ), // Double + "\"Long Binary\"", // Binary + parseInt( R(-32767, 32767) ), // Small Int + parseInt( R(-127, 127) ), // Tiny Int + randomBool(), + "\"Nchars 一些中文字幕\""]; // Bool + let query = 'insert into td_connector_test.all_types values(' + insertData.join(',') + ' );'; + performance.mark('B3'); + performance.measure('Data Prep', 'A3', 'B3'); + performance.mark('A2'); + c1.execute(query, {quiet:true}); + performance.mark('B2'); + performance.measure('Insert', 'A2', 'B2'); + if ( i % 5000 == 4999) { + console.log("Insert # " + (i+1)); + console.log('Insert 5k records: ' + ((insertTime - insertTime5000)/1000).toFixed(8) + 's'); + insertTime5000 = insertTime; + avgInsert5ktime = (avgInsert5ktime/1000 * Math.floor(i / 5000) + insertTime5000/1000) / Math.ceil( i / 5000); + console.log('DataPrepTime So Far: ' + (dataPrepTime/1000).toFixed(8) + 's | Inserting time So Far: ' + (insertTime/1000).toFixed(8) + 's | Avg. Insert 5k time: ' + avgInsert5ktime.toFixed(8)); + + + } + } +} +performance.mark('insert 1E5') +insertN(1E5); +performance.mark('insert 1E5 2') +performance.measure('Insert With Logs', 'insert 1E5', 'insert 1E5 2'); +console.log('DataPrepTime: ' + (dataPrepTime/1000).toFixed(8) + 's | Inserting time: ' + (insertTime/1000).toFixed(8) + 's'); +dataPrepTime = 0; insertTime = 0; +//'insert into td_connector_test.all_types values (now, null,null,null,null,null,null,null,null,null);' diff --git a/src/connector/nodejs/test/test.js b/src/connector/nodejs/test/test.js index 6c23fa9999..67f0a783b9 100644 --- a/src/connector/nodejs/test/test.js +++ b/src/connector/nodejs/test/test.js @@ -1,5 +1,5 @@ const taos = require('../tdengine'); -var conn = taos.connect({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:0}); +var conn = taos.connect({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:10}); var c1 = conn.cursor(); let stime = new Date(); let interval = 1000; @@ -23,14 +23,13 @@ function randomBool() { c1.execute('create database if not exists td_connector_test;'); c1.execute('use td_connector_test;') c1.execute('create table if not exists all_types (ts timestamp, _int int, _bigint bigint, _float float, _double double, _binary binary(40), _smallint smallint, _tinyint tinyint, _bool bool, _nchar nchar(40));'); - +c1.execute('create table if not exists stabletest (ts timestamp, v1 int, v2 int, v3 int, v4 double) tags (id int, location binary(20));') // Shell Test : The following uses the cursor to imitate the taos shell // Insert -for (let i = 0; i < 5000; i++) { - stime.setMilliseconds(stime.getMilliseconds() + interval); - let insertData = [convertDateToTS(stime), // Timestamp +for (let i = 0; i < 10000; i++) { + let insertData = ["now+" + i + "s", // Timestamp parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // Int parseInt( R(-Math.pow(2,31) + 1 , Math.pow(2,31) - 1) ), // BigInt parseFloat( R(-3.4E38, 3.4E38) ), // Float @@ -58,23 +57,80 @@ var d = c1.fetchall(); console.log(c1.fields); console.log(d); -//Immediate Execution like the Shell +// Immediate Execution like the Shell -c1.query('select count(*), stddev(_double), min(_tinyint) from all_types where _tinyint > 50 and _int < 0', true).then(function(result){ +c1.query('select count(*), stddev(_double), min(_tinyint) from all_types where _tinyint > 50 and _int < 0;', true).then(function(result){ result.pretty(); }) c1.query('select _tinyint, _bool from all_types where _tinyint > 50 and _int < 0 limit 50;', true).then(function(result){ result.pretty(); }) -c1.query('select stddev(_double), stddev(_bigint), stddev(_float) from all_types', true).then(function(result){ +c1.query('select stddev(_double), stddev(_bigint), stddev(_float) from all_types;', true).then(function(result){ + result.pretty(); +}) +c1.query('select stddev(_double), stddev(_bigint), stddev(_float) from all_types interval(1m) limit 100;', true).then(function(result){ result.pretty(); }) -var q = c1.query('select * from td_connector_test.all_types where ts >= ? and _int > ? limit 100 offset 40;').bind(new Date(1231), 100).execute().then(function(r) { +// Binding arguments, and then using promise +var q = c1.query('select * from td_connector_test.all_types where ts >= ? and _int > ? limit 100 offset 40;').bind(new Date(1231), 100) +console.log(q.query); +q.execute().then(function(r) { r.pretty(); }); -console.log(q._query); -c1.execute('drop database td_connector_test;') + +// Raw Async Testing (Callbacks, not promises) +function cb2(param, result, rowCount, rd) { + console.log("RES *", result); + console.log("Async fetched", rowCount, "rows"); + console.log("Passed Param: ", param); + console.log("Fields", rd.fields); + console.log("Data", rd.data); + +} +function cb1(param,result,code) { + console.log('Callbacked!'); + console.log("RES *", result); + console.log("Status: ", code); + console.log("Passed Param", param); + c1.fetchall_a(result, cb2, param) +} + +c1.execute_a("describe td_connector_test.all_types;", cb1, {myparam:3.141}); + +function cb4(param, result, rowCount, rd) { + console.log("RES *", result); + console.log("Async fetched", rowCount, "rows"); + console.log("Passed Param: ", param); + console.log("Fields", rd.fields); + console.log("Data", rd.data); + +} +// Without directly calling fetchall_a +var thisRes; +function cb3(param,result,code) { + console.log('Callbacked!'); + console.log("RES *", result); + console.log("Status: ", code); + console.log("Passed Param", param); + thisRes = result; +} +//Test calling execute and fetchall seperately and not through callbacks +var param = c1.execute_a("describe td_connector_test.all_types;", cb3, {e:2.718}); +console.log("Passed Param outside of callback: ", param); +setTimeout(function(){ + c1.fetchall_a(thisRes, cb4, param); +},100); + +// Async through promises +var aq = c1.query('select count(*) from td_connector_test.all_types;') +aq.execute_a().then(function(data) { + data.pretty(); +}) +c1.query('describe td_connector_test.stabletest;').execute_a().then(r=> r.pretty()); +setTimeout(function(){ + c1.query('drop database td_connector_test;'); +},2000); conn.close();