From 1d8af5d4cf18a68b340de42877f60410d0f70e94 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 7 May 2022 15:11:49 +0800 Subject: [PATCH 1/9] fix(query): fix memory leak. --- include/libs/executor/executor.h | 9 --------- source/libs/executor/src/executorimpl.c | 7 ++++--- source/libs/executor/src/scanoperator.c | 7 +++++++ 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index ad57cbf4e4..e0fce2aa6a 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -165,15 +165,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t */ int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); -/** - * release the query handle and decrease the reference count in cache - * @param pMgmt - * @param pQInfo - * @param freeHandle - * @return - */ -void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle); - void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 46c30c43c7..f9b06f360f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4184,6 +4184,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { pOperator->numOfDownstream = 0; } + taosMemoryFree(pOperator->pExpr); taosMemoryFreeClear(pOperator->info); taosMemoryFreeClear(pOperator); } @@ -4195,8 +4196,6 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); - // pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); - // pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ || pAggSup->pResultRowHashTable == NULL) { @@ -4358,6 +4357,7 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); + cleanupAggSup(&pInfo->aggSup); } void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { @@ -4687,7 +4687,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT char* p = taosMemoryCalloc(1, 128); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); - pTaskInfo->id.str = strdup(p); + pTaskInfo->id.str = p; return pTaskInfo; } @@ -5287,6 +5287,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo); + destroyOperatorInfo(pTaskInfo->pRoot); // taosArrayDestroy(pTaskInfo->summary.queryProfEvents); // taosHashCleanup(pTaskInfo->summary.operatorProfResults); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b28a65d1d2..132c279218 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -670,6 +670,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) { if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { metaCloseTbCursor(pInfo->pCur); } + + taosArrayDestroy(pInfo->scanCols); } EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { @@ -1248,6 +1250,11 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { SExprInfo* pExprInfo = &pOperator->pExpr[0]; SSDataBlock* pRes = pInfo->pRes; + if (taosArrayGetSize(pInfo->pTableGroups->pGroupList) == 0) { + setTaskStatus(pTaskInfo, TASK_COMPLETED); + return NULL; + } + SArray* pa = taosArrayGetP(pInfo->pTableGroups->pGroupList, 0); char str[512] = {0}; From 8a732b363ab34fe11a4492c60188b45d5cf51df8 Mon Sep 17 00:00:00 2001 From: jiacy-jcy <714897623@qq.com> Date: Sat, 7 May 2022 15:15:36 +0800 Subject: [PATCH 2/9] add case for diff --- tests/system-test/2-query/diff.py | 150 ++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 tests/system-test/2-query/diff.py diff --git a/tests/system-test/2-query/diff.py b/tests/system-test/2-query/diff.py new file mode 100644 index 0000000000..696366428b --- /dev/null +++ b/tests/system-test/2-query/diff.py @@ -0,0 +1,150 @@ +from util.log import * +from util.cases import * +from util.sql import * +import numpy as np + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + self.rowNum = 10 + self.ts = 1537146000000 + self.perfix = 'dev' + self.tables = 10 + + def insertData(self): + print("==============step1") + tdSql.execute( + "create table if not exists st (ts timestamp, col int) tags(dev nchar(50))") + + for i in range(self.tables): + tdSql.execute("create table %s%d using st tags(%d)" % (self.perfix, i, i)) + rows = 15 + i + for j in range(rows): + tdSql.execute("insert into %s%d values(%d, %d)" %(self.perfix, i, self.ts + i * 20 * 10000 + j * 10000, j)) + + def run(self): + tdSql.prepare() + + tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, + col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''') + tdSql.execute("create table stb_1 using stb tags('beijing')") + # tdSql.execute("insert into stb_1 values(%d, 0, 0, 0, 0, 0.0, 0.0, False, ' ', ' ', 0, 0, 0, 0)" % (self.ts - 1)) + + # # diff verifacation + # tdSql.query("select diff(col1) from stb_1") + # tdSql.checkRows(0) + + # tdSql.query("select diff(col2) from stb_1") + # tdSql.checkRows(0) + + # tdSql.query("select diff(col3) from stb_1") + # tdSql.checkRows(0) + + # tdSql.query("select diff(col4) from stb_1") + # tdSql.checkRows(0) + + # tdSql.query("select diff(col5) from stb_1") + # tdSql.checkRows(0) + + # tdSql.query("select diff(col6) from stb_1") + # tdSql.checkRows(0) + + for i in range(self.rowNum): + tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" + % (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) + + tdSql.error("select diff(ts) from stb") + tdSql.error("select diff(ts) from stb_1") + tdSql.error("select diff(col1) from stb") + tdSql.error("select diff(col2) from stb") + tdSql.error("select diff(col3) from stb") + tdSql.error("select diff(col4) from stb") + tdSql.error("select diff(col5) from stb") + tdSql.error("select diff(col6) from stb") + tdSql.error("select diff(col7) from stb") + tdSql.error("select diff(col7) from stb_1") + tdSql.error("select diff(col8) from stb") + tdSql.error("select diff(col8) from stb_1") + tdSql.error("select diff(col9) from stb") + tdSql.error("select diff(col9) from stb_1") + tdSql.error("select diff(col11) from stb_1") + tdSql.error("select diff(col12) from stb_1") + tdSql.error("select diff(col13) from stb_1") + tdSql.error("select diff(col14) from stb_1") + tdSql.error("select diff(col11) from stb") + tdSql.error("select diff(col12) from stb") + tdSql.error("select diff(col13) from stb") + tdSql.error("select diff(col14) from stb") + + tdSql.query("select ts,diff(col1),ts from stb_1") + tdSql.checkRows(10) + tdSql.checkData(0, 0, "2018-09-17 09:00:00.000") + tdSql.checkData(0, 1, "2018-09-17 09:00:00.000") + tdSql.checkData(0, 3, "2018-09-17 09:00:00.000") + tdSql.checkData(9, 0, "2018-09-17 09:00:00.009") + tdSql.checkData(9, 1, "2018-09-17 09:00:00.009") + tdSql.checkData(9, 3, "2018-09-17 09:00:00.009") + + tdSql.query("select ts,diff(col1),ts from stb group by tbname") + tdSql.checkRows(10) + tdSql.checkData(0, 0, "2018-09-17 09:00:00.000") + tdSql.checkData(0, 1, "2018-09-17 09:00:00.000") + tdSql.checkData(0, 3, "2018-09-17 09:00:00.000") + tdSql.checkData(9, 0, "2018-09-17 09:00:00.009") + tdSql.checkData(9, 1, "2018-09-17 09:00:00.009") + tdSql.checkData(9, 3, "2018-09-17 09:00:00.009") + + tdSql.query("select ts,diff(col1),ts from stb_1") + tdSql.checkRows(10) + tdSql.checkData(0, 0, "2018-09-17 09:00:00.000") + tdSql.checkData(0, 1, "2018-09-17 09:00:00.000") + tdSql.checkData(0, 3, "2018-09-17 09:00:00.000") + tdSql.checkData(9, 0, "2018-09-17 09:00:00.009") + tdSql.checkData(9, 1, "2018-09-17 09:00:00.009") + tdSql.checkData(9, 3, "2018-09-17 09:00:00.009") + + tdSql.query("select ts,diff(col1),ts from stb group by tbname") + tdSql.checkRows(10) + tdSql.checkData(0, 0, "2018-09-17 09:00:00.000") + tdSql.checkData(0, 1, "2018-09-17 09:00:00.000") + tdSql.checkData(0, 3, "2018-09-17 09:00:00.000") + tdSql.checkData(9, 0, "2018-09-17 09:00:00.009") + tdSql.checkData(9, 1, "2018-09-17 09:00:00.009") + tdSql.checkData(9, 3, "2018-09-17 09:00:00.009") + + tdSql.query("select diff(col1) from stb_1") + tdSql.checkRows(10) + + tdSql.query("select diff(col2) from stb_1") + tdSql.checkRows(10) + + tdSql.query("select diff(col3) from stb_1") + tdSql.checkRows(10) + + tdSql.query("select diff(col4) from stb_1") + tdSql.checkRows(10) + + tdSql.query("select diff(col5) from stb_1") + tdSql.checkRows(10) + + tdSql.query("select diff(col6) from stb_1") + tdSql.checkRows(10) + + self.insertData() + + tdSql.query("select diff(col) from st group by tbname") + tdSql.checkRows(185) + + tdSql.error("select diff(col) from st group by dev") + + tdSql.error("select diff(col) from st group by col") + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file From 00c0f455f326dee9b083eb4886725b5cec187267 Mon Sep 17 00:00:00 2001 From: jiacy-jcy <714897623@qq.com> Date: Sat, 7 May 2022 16:22:11 +0800 Subject: [PATCH 3/9] update --- tests/system-test/2-query/diff.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/system-test/2-query/diff.py b/tests/system-test/2-query/diff.py index 696366428b..82c450771f 100644 --- a/tests/system-test/2-query/diff.py +++ b/tests/system-test/2-query/diff.py @@ -31,26 +31,26 @@ class TDTestCase: tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''') tdSql.execute("create table stb_1 using stb tags('beijing')") - # tdSql.execute("insert into stb_1 values(%d, 0, 0, 0, 0, 0.0, 0.0, False, ' ', ' ', 0, 0, 0, 0)" % (self.ts - 1)) + tdSql.execute("insert into stb_1 values(%d, 0, 0, 0, 0, 0.0, 0.0, False, ' ', ' ', 0, 0, 0, 0)" % (self.ts - 1)) - # # diff verifacation - # tdSql.query("select diff(col1) from stb_1") - # tdSql.checkRows(0) + # diff verifacation + tdSql.query("select diff(col1) from stb_1") + tdSql.checkRows(0) - # tdSql.query("select diff(col2) from stb_1") - # tdSql.checkRows(0) + tdSql.query("select diff(col2) from stb_1") + tdSql.checkRows(0) - # tdSql.query("select diff(col3) from stb_1") - # tdSql.checkRows(0) + tdSql.query("select diff(col3) from stb_1") + tdSql.checkRows(0) - # tdSql.query("select diff(col4) from stb_1") - # tdSql.checkRows(0) + tdSql.query("select diff(col4) from stb_1") + tdSql.checkRows(0) - # tdSql.query("select diff(col5) from stb_1") - # tdSql.checkRows(0) + tdSql.query("select diff(col5) from stb_1") + tdSql.checkRows(0) - # tdSql.query("select diff(col6) from stb_1") - # tdSql.checkRows(0) + tdSql.query("select diff(col6) from stb_1") + tdSql.checkRows(0) for i in range(self.rowNum): tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" From f508da9e139e2e404e59fc53af15f231a05b64c3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 7 May 2022 16:22:52 +0800 Subject: [PATCH 4/9] fix(query): eliminate memory leak --- include/common/tcommon.h | 2 -- source/libs/executor/src/executorimpl.c | 36 +++++++++++++++++++------ source/libs/executor/src/scanoperator.c | 2 +- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 16653ebfee..84ea208508 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -115,8 +115,6 @@ void* tDecodeDataBlocks(const void* buf, SArray** blocks); void colDataDestroy(SColumnInfoData* pColData); static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) { - // WARNING: do not use info.numOfCols, - // sometimes info.numOfCols != array size int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f9b06f360f..321b7b68a5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1225,6 +1225,8 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { taosVariantDestroy(&pCtx[i].tag); taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx); + taosMemoryFree(pCtx[i].input.pData); + taosMemoryFree(pCtx[i].input.pColumnDataAgg); } taosMemoryFreeClear(pCtx); @@ -2840,9 +2842,9 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) { - blockDataEnsureCapacity(pRes, numOfRows); - if (pColList == NULL) { // data from other sources + blockDataEnsureCapacity(pRes, numOfRows); + int32_t dataLen = *(int32_t*)pData; pData += sizeof(int32_t); @@ -2898,20 +2900,23 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI pStart += sizeof(SSysTableSchema); } - SSDataBlock block = {.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)), .info.numOfCols = numOfCols}; + SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + pBlock->info.numOfCols = numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {0}; idata.info.type = pSchema[i].type; idata.info.bytes = pSchema[i].bytes; idata.info.colId = pSchema[i].colId; - taosArrayPush(block.pDataBlock, &idata); + taosArrayPush(pBlock->pDataBlock, &idata); if (IS_VAR_DATA_TYPE(idata.info.type)) { - block.info.hasVarCol = true; + pBlock->info.hasVarCol = true; } } - blockDataEnsureCapacity(&block, numOfRows); + blockDataEnsureCapacity(pBlock, numOfRows); int32_t dataLen = *(int32_t*)pStart; uint64_t groupId = *(uint64_t*)(pStart + sizeof(int32_t)); @@ -2924,7 +2929,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI colLen[i] = htonl(colLen[i]); ASSERT(colLen[i] >= 0); - SColumnInfoData* pColInfoData = taosArrayGet(block.pDataBlock, i); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { pColInfoData->varmeta.length = colLen[i]; pColInfoData->varmeta.allocLen = colLen[i]; @@ -2943,7 +2948,10 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI } // data from mnode - relocateColumnData(pRes, pColList, block.pDataBlock); + relocateColumnData(pRes, pColList, pBlock->pDataBlock); + taosArrayDestroy(pBlock->pDataBlock); + taosMemoryFree(pBlock); +// blockDataDestroy(pBlock); } pRes->info.rows = numOfRows; @@ -4184,6 +4192,17 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { pOperator->numOfDownstream = 0; } + if (pOperator->pExpr != NULL) { + for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { + SExprInfo* pExprInfo = &pOperator->pExpr[i]; + if (pExprInfo->pExpr->nodeType == QUERY_NODE_COLUMN) { + taosMemoryFree(pExprInfo->base.pParam[0].pCol); + } + taosMemoryFree(pExprInfo->base.pParam); + taosMemoryFree(pExprInfo->pExpr); + } + } + taosMemoryFree(pOperator->pExpr); taosMemoryFreeClear(pOperator->info); taosMemoryFreeClear(pOperator); @@ -4358,6 +4377,7 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); cleanupAggSup(&pInfo->aggSup); + taosArrayDestroy(pInfo->pPseudoColInfo); } void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 132c279218..2b94c5fdce 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1156,7 +1156,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfExprs = pResBlock->info.numOfCols; + pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; From 714ab7b5aa940160369ac7bcb6d55338bcfc8664 Mon Sep 17 00:00:00 2001 From: jiacy-jcy <714897623@qq.com> Date: Sat, 7 May 2022 16:28:09 +0800 Subject: [PATCH 5/9] update test case --- tests/system-test/2-query/Timediff.py | 11 ++++++++++- tests/system-test/fulltest.sh | 3 ++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/system-test/2-query/Timediff.py b/tests/system-test/2-query/Timediff.py index 2824fea5a2..ad64d29007 100644 --- a/tests/system-test/2-query/Timediff.py +++ b/tests/system-test/2-query/Timediff.py @@ -181,7 +181,16 @@ class TDTestCase: tdSql.error("select timediff(10,1,1.5) from stb") # tdSql.error("select timediff(10,1,2s) from stb") # tdSql.error("select timedifff(10,1,c1) from stb") - + tdSql.error("select timediff(1.5,1.5) from stb_1") + tdSql.error("select timediff(1) from stb_1") + tdSql.error("select timediff(10,1,1.5) from stb_1") + # tdSql.error("select timediff(10,1,2s) from stb_1") + # tdSql.error("select timedifff(10,1,c1) from stb_1") + tdSql.error("select timediff(1.5,1.5) from ntb") + tdSql.error("select timediff(1) from ntb") + tdSql.error("select timediff(10,1,1.5) from ntb") + # tdSql.error("select timediff(10,1,2s) from ntb") + # tdSql.error("select timedifff(10,1,c1) from ntb") diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 22dc59365a..cb9d472116 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -23,7 +23,8 @@ python3 ./test.py -f 2-query/last.py python3 ./test.py -f 2-query/To_unixtimestamp.py python3 ./test.py -f 2-query/timetruncate.py -# python3 ./test.py -f 2-query/Timediff.py +python3 ./test.py -f 2-query/Timediff.py +# python3 ./test.py -f 2-query/diff.py #python3 ./test.py -f 2-query/cast.py From 47b8027ac196377b3cf89fc3dc626d34c514d114 Mon Sep 17 00:00:00 2001 From: jiacy-jcy <714897623@qq.com> Date: Sat, 7 May 2022 16:31:58 +0800 Subject: [PATCH 6/9] update last.py --- tests/system-test/2-query/last.py | 36 +++++++++++++++---------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/tests/system-test/2-query/last.py b/tests/system-test/2-query/last.py index b5c0498e40..531afd0117 100644 --- a/tests/system-test/2-query/last.py +++ b/tests/system-test/2-query/last.py @@ -21,12 +21,12 @@ class TDTestCase: tdSql.execute("insert into stb_1(ts) values(%d)" % (self.ts - 1)) # last verifacation - # tdSql.query("select last(*) from stb_1") - # tdSql.checkRows(1) - # tdSql.checkData(0, 1, None) - # tdSql.query("select last(*) from db.stb_1") - # tdSql.checkRows(1) - # tdSql.checkData(0, 1, None) + tdSql.query("select last(*) from stb_1") + tdSql.checkRows(1) + tdSql.checkData(0, 1, None) + tdSql.query("select last(*) from db.stb_1") + tdSql.checkRows(1) + tdSql.checkData(0, 1, None) tdSql.query("select last(col1) from stb_1") tdSql.checkRows(0) tdSql.query("select last(col1) from db.stb_1") @@ -86,12 +86,12 @@ class TDTestCase: tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" % (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) - # tdSql.query("select last(*) from stb_1") - # tdSql.checkRows(1) - # tdSql.checkData(0, 1, 10) - # tdSql.query("select last(*) from db.stb_1") - # tdSql.checkRows(1) - # tdSql.checkData(0, 1, 10) + tdSql.query("select last(*) from stb_1") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 10) + tdSql.query("select last(*) from db.stb_1") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 10) tdSql.query("select last(col1) from stb_1") tdSql.checkRows(1) tdSql.checkData(0, 0, 10) @@ -175,12 +175,12 @@ class TDTestCase: tdSql.execute('''create table ntb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''') tdSql.execute("insert into ntb(ts) values(%d)" % (self.ts - 1)) - # tdSql.query("select last(*) from ntb") - # tdSql.checkRows(1) - # tdSql.checkData(0, 1, None) - # tdSql.query("select last(*) from db.ntb") - # tdSql.checkRows(1) - # tdSql.checkData(0, 1, None) + tdSql.query("select last(*) from ntb") + tdSql.checkRows(1) + tdSql.checkData(0, 1, None) + tdSql.query("select last(*) from db.ntb") + tdSql.checkRows(1) + tdSql.checkData(0, 1, None) tdSql.query("select last(col1) from ntb") tdSql.checkRows(0) tdSql.query("select last(col1) from db.ntb") From bb6a7855fbe199fe709aaddf291b1f70d2c4b159 Mon Sep 17 00:00:00 2001 From: jiacy-jcy <714897623@qq.com> Date: Sat, 7 May 2022 16:32:44 +0800 Subject: [PATCH 7/9] update --- tests/system-test/2-query/last.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/system-test/2-query/last.py b/tests/system-test/2-query/last.py index 531afd0117..b491679c62 100644 --- a/tests/system-test/2-query/last.py +++ b/tests/system-test/2-query/last.py @@ -316,12 +316,12 @@ class TDTestCase: tdSql.query("select last(col8) from db.ntb") tdSql.checkRows(1) tdSql.checkData(0, 0, 'taosdata10') - # tdSql.query("select last(col9) from ntb") - # tdSql.checkRows(1) - # tdSql.checkData(0, 0, '涛思数据10') - # tdSql.query("select last(col9) from db.ntb") - # tdSql.checkRows(1) - # tdSql.checkData(0, 0, '涛思数据10') + tdSql.query("select last(col9) from ntb") + tdSql.checkRows(1) + tdSql.checkData(0, 0, '涛思数据10') + tdSql.query("select last(col9) from db.ntb") + tdSql.checkRows(1) + tdSql.checkData(0, 0, '涛思数据10') def stop(self): tdSql.close() From d7eface9accde0401a75e2518e1e59202cc5b097 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 7 May 2022 16:55:40 +0800 Subject: [PATCH 8/9] feat: rollup sma optimization --- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 8 +++++--- source/dnode/vnode/src/tsdb/tsdbSma.c | 16 +++++++++++++++- source/dnode/vnode/src/vnd/vnodeCfg.c | 2 +- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 48d32c0dd3..eaa0893f29 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -249,8 +249,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey); lastKey = rowKey; - ++pCols->numOfRows; - tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); + if (pCols) { + ++pCols->numOfRows; + tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); + } } else { tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true); } @@ -279,7 +281,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey } #endif } - if (lastKey != TSKEY_INITIAL_VAL) { + if (pCols && (lastKey != TSKEY_INITIAL_VAL)) { ++pCols->numOfRows; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 105bc330d3..856481bc5f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -1638,7 +1638,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) { tsdbWarn("vgId:%d tsma create msg received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno)); return -1; } - + tsdbDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", REPO_ID(pTsdb), vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid); @@ -2006,6 +2006,12 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, int8_t level) { SArray *pResult = NULL; + + if (!taskInfo) { + tsdbDebug("vgId:%d no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, REPO_ID(pTsdb), level, suid); + return TSDB_CODE_SUCCESS; + } + tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo, suid); @@ -2071,10 +2077,18 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), suid); return TSDB_CODE_SUCCESS; } + if (!pRSmaInfo->taskInfo[0]) { + tsdbDebug("vgId:%d no rsma qTaskInfo for suid:%" PRIu64, REPO_ID(pTsdb), suid); + return TSDB_CODE_SUCCESS; + } if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { // TODO: use the proper schema instead of 0, and cache STSchema in cache STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0); + if (!pTSchema) { + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + return TSDB_CODE_FAILED; + } tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1); tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2); taosMemoryFree(pTSchema); diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 7eac0389d2..5e21abb404 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -25,7 +25,7 @@ const SVnodeCfg vnodeCfgDefault = { .isHeap = false, .isWeak = 0, .tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI, - .update = 0, + .update = 1, .compression = 2, .slLevel = 5, .days = 10, From f867e8ea9165c20f0f09c56765e7f236401db6ad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 7 May 2022 17:23:05 +0800 Subject: [PATCH 9/9] fix(query): eliminate memory leak. --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorMain.c | 15 --------------- source/libs/executor/src/executorimpl.c | 2 +- 3 files changed, 2 insertions(+), 16 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9eed76a0fe..3ed65f4a05 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -225,6 +225,7 @@ typedef struct SExecTaskInfo { char* sql; // query sql string jmp_buf env; // jump to this position when error happens. EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + struct SSubplan *plan; struct SOperatorInfo* pRoot; } SExecTaskInfo; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 7705744694..ba77950912 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -36,21 +36,6 @@ typedef struct STaskMgmt { bool closed; } STaskMgmt; -static void taskMgmtKillTaskFn(void* handle, void* param1) { - void** fp = (void**)handle; - qKillTask(*fp); -} - -static void freeqinfoFn(void *qhandle) { - void** handle = qhandle; - if (handle == NULL || *handle == NULL) { - return; - } - - qKillTask(*handle); - qDestroyTask(*handle); -} - int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) { assert(readHandle != NULL && pSubplan != NULL); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 34339d3a7f..c0ea54ce4a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5209,6 +5209,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } + (*pTaskInfo)->plan = pPlan; return code; _complete: @@ -5311,7 +5312,6 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { // taosArrayDestroy(pTaskInfo->summary.queryProfEvents); // taosHashCleanup(pTaskInfo->summary.operatorProfResults); - destroyOperatorInfo(pTaskInfo->pRoot); taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo);