diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 590f205e1d..b15286fe80 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -53,12 +53,18 @@ typedef struct SParsedDataColInfo { bool hasVal[TSDB_MAX_COLUMNS]; } SParsedDataColInfo; +#pragma pack(push,1) +// this struct is transfered as binary, padding two bytes to avoid +// an 'uid' whose low bytes is 0xff being recoginized as NULL, +// and set 'pack' to 1 to avoid break existing code. typedef struct STidTags { + int16_t padding; int64_t uid; int32_t tid; int32_t vgId; char tag[]; } STidTags; +#pragma pack(pop) typedef struct SJoinSupporter { SSubqueryState* pState; @@ -224,6 +230,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); void tscInitQueryInfo(SQueryInfo* pQueryInfo); void tscClearSubqueryInfo(SSqlCmd* pCmd); +void tscFreeVgroupTableInfo(SArray* pVgroupTables); int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex); int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index d1f5b510a4..9c0951cdd6 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -169,7 +169,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI // (uid, tid) + VGID + TAGSIZE + VARSTR_HEADER_SIZE if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct *type = TSDB_DATA_TYPE_BINARY; - *bytes = (int16_t)(dataBytes + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE); + *bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE); *interBytes = *bytes; return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 4d4254fa46..2977ed01ec 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -912,7 +912,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; pQueryMsg->head.contLen = htonl(msgLen); - assert(msgLen + minMsgSize() <= size); + assert(msgLen + minMsgSize() <= pCmd->allocSize); return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 81d73236a4..f740e3cce9 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -230,6 +230,19 @@ static SArray* getTableList( SSqlObj* pSql ) { return result; } +static int32_t compareTidTag(const void* p1, const void* p2) { + const STidTags* t1 = (const STidTags*)p1; + const STidTags* t2 = (const STidTags*)p2; + + if (t1->vgId != t2->vgId) { + return (t1->vgId > t2->vgId) ? 1 : -1; + } + if (t1->tid != t2->tid) { + return (t1->tid > t2->tid) ? 1 : -1; + } + return 0; +} + static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { SSqlObj* pSql = pSub->pSql; @@ -270,7 +283,8 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { pSub->progress = progress; if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { - taosArraySort( tables, tscCompareTidTags ); + taosArraySort( tables, compareTidTag ); + tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); tscBuildVgroupTableInfo(pSql, pTableMetaInfo, tables); } taosArrayDestroy(tables); @@ -410,6 +424,9 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { } } + size_t size = taosArrayGetSize(pSub->progress) * sizeof(STableIdInfo); + size += sizeof(SQueryTableMsg) + 4096; + tscAllocPayload(&pSql->cmd, size); for (int retry = 0; retry < 3; retry++) { tscRemoveFromSqlList(pSql); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index fc60e24d8e..1b6d18be0c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1556,12 +1556,22 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) { } } +void tscFreeVgroupTableInfo(SArray* pVgroupTables) { + if (pVgroupTables != NULL) { + for (size_t i = 0; i < taosArrayGetSize(pVgroupTables); i++) { + SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); + taosArrayDestroy(pInfo->itemList); + } + taosArrayDestroy(pVgroupTables); + } +} + void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) { tscDebug("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); - + tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); tscClearTableMetaInfo(pTableMetaInfo, removeFromCache); free(pTableMetaInfo); } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 22c2bb5c9a..be3d476be5 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6549,12 +6549,15 @@ static void buildTagQueryResult(SQInfo* pQInfo) { int32_t i = pQInfo->tableIndex++; STableQueryInfo *item = taosArrayGetP(pa, i); - char *output = pQuery->sdata[0]->data + i * rsize; + char *output = pQuery->sdata[0]->data + count * rsize; varDataSetLen(output, rsize - VARSTR_HEADER_SIZE); output = varDataVal(output); STableId* id = TSDB_TABLEID(item->pTable); + *(int16_t *)output = 0; + output += sizeof(int16_t); + *(int64_t *)output = id->uid; // memory align problem, todo serialize output += sizeof(id->uid); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 97fc467ba4..17b0239e3b 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -2444,11 +2444,8 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); destroyTableMemIterator(pTableCheckInfo); - if (pTableCheckInfo->pDataCols != NULL) { - taosTFree(pTableCheckInfo->pDataCols->buf); - } - - taosTFree(pTableCheckInfo->pDataCols); + tdFreeDataCols(pTableCheckInfo->pDataCols); + pTableCheckInfo->pDataCols = NULL; taosTFree(pTableCheckInfo->pCompInfo); } taosArrayDestroy(pQueryHandle->pTableCheckInfo); diff --git a/tests/examples/c/subscribe.c b/tests/examples/c/subscribe.c index f9acf2bb10..db5ad34ee7 100644 --- a/tests/examples/c/subscribe.c +++ b/tests/examples/c/subscribe.c @@ -7,28 +7,31 @@ #include // include TDengine header file #include +int nTotalRows; + void print_result(TAOS_RES* res, int blockFetch) { TAOS_ROW row = NULL; int num_fields = taos_num_fields(res); TAOS_FIELD* fields = taos_fetch_fields(res); int nRows = 0; + char buf[4096]; + if (blockFetch) { nRows = taos_fetch_block(res, &row); for (int i = 0; i < nRows; i++) { - char temp[256]; - taos_print_row(temp, row + i, fields, num_fields); - puts(temp); + taos_print_row(buf, row + i, fields, num_fields); + puts(buf); } } else { while ((row = taos_fetch_row(res))) { - char temp[256]; - taos_print_row(temp, row, fields, num_fields); - puts(temp); + taos_print_row(buf, row, fields, num_fields); + puts(buf); nRows++; } } + nTotalRows += nRows; printf("%d rows consumed.\n", nRows); } @@ -52,47 +55,52 @@ void check_row_count(int line, TAOS_RES* res, int expected) { } +void do_query(TAOS* taos, const char* sql) { + TAOS_RES* res = taos_query(taos, "drop database if exists test;"); + taos_free_result(res); +} + + void run_test(TAOS* taos) { - taos_query(taos, "drop database if exists test;"); + do_query(taos, "drop database if exists test;"); usleep(100000); - //taos_query(taos, "create database test tables 5;"); - taos_query(taos, "create database test;"); + do_query(taos, "create database test;"); usleep(100000); - taos_query(taos, "use test;"); + do_query(taos, "use test;"); usleep(100000); - taos_query(taos, "create table meters(ts timestamp, a int) tags(area int);"); + do_query(taos, "create table meters(ts timestamp, a int) tags(area int);"); - taos_query(taos, "create table t0 using meters tags(0);"); - taos_query(taos, "create table t1 using meters tags(1);"); - taos_query(taos, "create table t2 using meters tags(2);"); - taos_query(taos, "create table t3 using meters tags(3);"); - taos_query(taos, "create table t4 using meters tags(4);"); - taos_query(taos, "create table t5 using meters tags(5);"); - taos_query(taos, "create table t6 using meters tags(6);"); - taos_query(taos, "create table t7 using meters tags(7);"); - taos_query(taos, "create table t8 using meters tags(8);"); - taos_query(taos, "create table t9 using meters tags(9);"); + do_query(taos, "create table t0 using meters tags(0);"); + do_query(taos, "create table t1 using meters tags(1);"); + do_query(taos, "create table t2 using meters tags(2);"); + do_query(taos, "create table t3 using meters tags(3);"); + do_query(taos, "create table t4 using meters tags(4);"); + do_query(taos, "create table t5 using meters tags(5);"); + do_query(taos, "create table t6 using meters tags(6);"); + do_query(taos, "create table t7 using meters tags(7);"); + do_query(taos, "create table t8 using meters tags(8);"); + do_query(taos, "create table t9 using meters tags(9);"); - taos_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);"); - taos_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);"); - taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);"); - taos_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);"); - taos_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);"); - taos_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);"); - taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);"); - taos_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);"); - taos_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);"); - taos_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);"); - taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);"); - taos_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);"); - taos_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);"); - taos_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);"); - taos_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);"); - taos_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);"); - taos_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);"); - taos_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);"); + do_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);"); + do_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);"); + do_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);"); + do_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);"); + do_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);"); + do_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);"); + do_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);"); + do_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);"); + do_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);"); + do_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);"); + do_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);"); // super tables subscription usleep(1000000); @@ -104,23 +112,23 @@ void run_test(TAOS* taos) { res = taos_consume(tsub); check_row_count(__LINE__, res, 0); - taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);"); - taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);"); + do_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);"); + do_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);"); res = taos_consume(tsub); check_row_count(__LINE__, res, 2); - taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);"); - taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);"); + do_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);"); + do_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);"); res = taos_consume(tsub); check_row_count(__LINE__, res, 2); - taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);"); + do_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);"); res = taos_consume(tsub); check_row_count(__LINE__, res, 1); // keep progress information and restart subscription taos_unsubscribe(tsub, 1); - taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);"); + do_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);"); tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0); res = taos_consume(tsub); check_row_count(__LINE__, res, 24); @@ -147,7 +155,7 @@ void run_test(TAOS* taos) { res = taos_consume(tsub); check_row_count(__LINE__, res, 0); - taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);"); + do_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);"); res = taos_consume(tsub); check_row_count(__LINE__, res, 1); @@ -223,7 +231,7 @@ int main(int argc, char *argv[]) { exit(0); } - taos_query(taos, "use test;"); + taos_select_db(taos, "test"); TAOS_SUB* tsub = NULL; if (async) { // create an asynchronized subscription, the callback function will be called every 1s @@ -251,6 +259,7 @@ int main(int argc, char *argv[]) { } } + printf("total rows consumed: %d\n", nTotalRows); taos_unsubscribe(tsub, keep); taos_close(taos); diff --git a/tests/pytest/subscribe/stability.py b/tests/pytest/subscribe/stability.py new file mode 100644 index 0000000000..ddd8b3282a --- /dev/null +++ b/tests/pytest/subscribe/stability.py @@ -0,0 +1,93 @@ +################################################################### + # Copyright (c) 2020 by TAOS Technologies, Inc. + # All rights reserved. + # + # This file is proprietary and confidential to TAOS Technologies. + # No part of this file may be reproduced, stored, transmitted, + # disclosed or used in any form or by any means other than as + # expressly provided by the written permission from Jianhui Tao + # +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +import time +import random +import string +from util.log import * +from util.cases import * +from util.sql import * +from util.sub import * + +class TDTestCase: + maxTables = 10000 + maxCols = 50 + rowsPerSecond = 1000 + + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdLog.notice("NOTE: this case does not stop automatically, Ctrl+C to stop") + tdSql.init(conn.cursor(), logSql) + self.conn = conn + + + def generateString(self, length): + chars = string.ascii_uppercase + string.ascii_lowercase + v = "" + for i in range(length): + v += random.choice(chars) + return v + + + def insert(self): + id = random.randint(0, self.maxTables - 1) + cola = self.generateString(40) + sql = "insert into car%d values(now, '%s', %f, %d" % (id, cola, random.random()*100, random.randint(0, 2)) + for i in range(self.maxCols): + sql += ", %d" % random.randint(0, self.maxTables) + sql += ")" + tdSql.execute(sql) + + + def prepare(self): + tdLog.info("prepare database: test") + tdSql.execute('reset query cache') + tdSql.execute('drop database if exists test') + tdSql.execute('create database test') + tdSql.execute('use test') + + def run(self): + self.prepare() + + sql = "create table cars (ts timestamp, a binary(50), b float, c bool" + for i in range(self.maxCols): + sql += ", c%d int" % i + sql += ") tags(id int, category binary(30), brand binary(30));" + tdSql.execute(sql) + + for i in range(self.maxTables): + tdSql.execute("create table car%d using cars tags(%d, 'category%d', 'brand%d')" % (i, i, i % 30, i // 30)) + + time.sleep(0.1) + + total = 0 + while True: + start = time.time() + for i in range(self.rowsPerSecond): + self.insert() + total = total + 1 + d = time.time() - start + tdLog.info("%d rows inserted in %f seconds, total %d" % (self.rowsPerSecond, d, total)) + if d < 1: + time.sleep(1 - d) + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/subscribe/supertable.py b/tests/pytest/subscribe/supertable.py index c6cc2969aa..ee3aa225bd 100644 --- a/tests/pytest/subscribe/supertable.py +++ b/tests/pytest/subscribe/supertable.py @@ -31,16 +31,19 @@ class TDTestCase: now = int(time.time() * 1000) tdSql.prepare() - tdLog.info("create a super table and 10 sub-tables, then insert 5 rows into each sub-table.") + numTables = 2000 + rowsPerTable = 5 + totalRows = numTables * rowsPerTable + tdLog.info("create a super table and %d sub-tables, then insert %d rows into each sub-table." % (numTables, rowsPerTable)) tdSql.execute("create table meters(ts timestamp, a int, b int) tags(area int, loc binary(20));") - for i in range(0, 10): - for j in range(0, 5): + for i in range(0, numTables): + for j in range(0, rowsPerTable): tdSql.execute("insert into t%d using meters tags(%d, 'area%d') values (%d, %d, %d);" % (i, i, i, now + j, j, j)) tdLog.info("consumption 01.") tdSub.init(self.conn.subscribe(True, topic, sqlstr, 0)) tdSub.consume() - tdSub.checkRows(50) + tdSub.checkRows(totalRows) tdLog.info("consumption 02: no new rows inserted") tdSub.consume() @@ -61,17 +64,17 @@ class TDTestCase: tdSub.close(False) tdSub.init(self.conn.subscribe(False, topic, sqlstr, 0)) tdSub.consume() - tdSub.checkRows(51) + tdSub.checkRows(totalRows + 1) tdLog.info("consumption 06: keep progress and restart the subscription") tdSub.close(True) tdSub.init(self.conn.subscribe(True, topic, sqlstr, 0)) tdSub.consume() - tdSub.checkRows(51) + tdSub.checkRows(totalRows + 1) tdLog.info("consumption 07: insert one row to two table then remove one table") tdSql.execute("insert into t0 values (%d, 11, 11);" % (now + 11)) - tdSql.execute("insert into t1 values (%d, 11, 11);" % (now + 11)) + tdSql.execute("insert into t%d values (%d, 11, 11);" % ((numTables-1), (now + 11))) tdSql.execute("drop table t0") tdSub.consume() tdSub.checkRows(1) @@ -80,7 +83,7 @@ class TDTestCase: tdSub.close(False) tdSub.init(self.conn.subscribe(True, topic, sqlstr + " where ts > %d" % now, 0)) tdSub.consume() - tdSub.checkRows(37) + tdSub.checkRows((numTables-1) * (rowsPerTable-1) + 1) tdLog.info("consumption 09: insert large timestamp to t2 then insert smaller timestamp to t1") tdSql.execute("insert into t2 values (%d, 100, 100);" % (now + 100)) @@ -101,10 +104,15 @@ class TDTestCase: tdLog.info("consumption 11: two vnodes") tdSql.execute("insert into t2 values (%d, 102, 100);" % (now + 104)) - tdSql.execute("insert into t9 values (%d, 102, 100);" % (now + 104)) + tdSql.execute("insert into t1299 values (%d, 102, 100);" % (now + 104)) tdSub.consume() tdSub.checkRows(2) + tdLog.info("consumption 12: create a new table") + tdSql.execute("insert into t%d using meters tags(%d, 'area%d') values (%d, 102, 100);" % (numTables, numTables, numTables, now + 105)) + tdSub.consume() + tdSub.checkRows(1) + def stop(self): tdSub.close(False) tdSql.close()