From 6aa09f4d8f3ad83f135747a363c073f1a9390012 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 17 Apr 2024 13:55:06 +0800 Subject: [PATCH 1/3] fix cancel create tsma memory use after free --- source/client/src/clientEnv.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 6c20813118..7f73aa6845 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -538,7 +538,6 @@ void stopAllQueries(SRequestObj *pRequest) { pTmp = acquireRequest(tmpRefId); if (pTmp) { pReqList[++reqIdx] = pTmp; - releaseRequest(tmpRefId); } else { tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId); break; @@ -547,6 +546,7 @@ void stopAllQueries(SRequestObj *pRequest) { for (int32_t i = reqIdx; i >= 0; i--) { taosStopQueryImpl(pReqList[i]); + releaseRequest(pReqList[i]->self); } taosStopQueryImpl(pRequest); From 2c0624a8ae6cdd8504d79f81f6ae4aa5e4cc50e6 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 17 Apr 2024 15:29:36 +0800 Subject: [PATCH 2/3] fix create tsma transaction --- source/dnode/mnode/impl/src/mndSma.c | 23 ++++++++++++++++++++++- source/dnode/mnode/impl/src/mndTrans.c | 2 +- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index dd569b4c59..aaa0c42262 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -1526,6 +1526,8 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { int32_t code = -1; STransAction createStreamRedoAction = {0}; STransAction createStreamUndoAction = {0}; + STransAction dropStbUndoAction = {0}; + SMDropStbReq dropStbReq = {0}; STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma"); if (!pTrans) { @@ -1556,7 +1558,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { createStreamUndoAction.epSet = createStreamRedoAction.epSet; createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST; - createStreamUndoAction.actionType = TDMT_STREAM_DROP; + createStreamUndoAction.msgType = TDMT_STREAM_DROP; createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq); createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen); if (!createStreamUndoAction.pCont) { @@ -1569,6 +1571,24 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { goto _OVER; } + dropStbReq.igNotExists = true; + strncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN); + dropStbUndoAction.epSet = createStreamRedoAction.epSet; + dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST; + dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED; + dropStbUndoAction.msgType = TDMT_MND_STB_DROP; + dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq); + dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen); + if (!dropStbUndoAction.pCont) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + if (dropStbUndoAction.contLen != tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) { + mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name); + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + SDbObj newDb = {0}; memcpy(&newDb, pCxt->pDb, sizeof(SDbObj)); newDb.tsmaVersion++; @@ -1579,6 +1599,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER; if (mndTransAppendRedoAction(pTrans, &createStreamRedoAction) != 0) goto _OVER; if (mndTransAppendUndoAction(pTrans, &createStreamUndoAction) != 0) goto _OVER; + if (mndTransAppendUndoAction(pTrans, &dropStbUndoAction) != 0) goto _OVER; if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER; code = TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 41ff45038f..7b6563f4b4 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1109,7 +1109,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) { goto _OVER; } - int32_t actionNum = taosArrayGetSize(pTrans->redoActions); + int32_t actionNum = taosArrayGetSize(pArray); if (action < 0 || action >= actionNum) { mError("trans:%d, invalid action:%d", transId, action); goto _OVER; From f971cfb778d1047c1db66bd463be280fe17afbea Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Mon, 22 Apr 2024 14:00:06 +0800 Subject: [PATCH 3/3] 1. fix stream wrong group id for new child tables. 2. fix md5 function wrong bytes returned --- source/libs/executor/src/executil.c | 43 +++++++++++++++---------- source/libs/function/src/builtins.c | 2 +- source/libs/planner/src/planOptimizer.c | 1 + tests/system-test/2-query/tsma.py | 26 +++++++++------ 4 files changed, 45 insertions(+), 27 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index be6fb2983c..d06beebd6b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -287,7 +287,19 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { SMetaReader* mr = (SMetaReader*)pContext; + bool isTagCol = false, isTbname = false; if (nodeType(*pNode) == QUERY_NODE_COLUMN) { + SColumnNode* pCol = (SColumnNode*)*pNode; + if (pCol->colType == COLUMN_TYPE_TBNAME) + isTbname = true; + else + isTagCol = true; + } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) { + SFunctionNode* pFunc = (SFunctionNode*)*pNode; + if (pFunc->funcType == FUNCTION_TYPE_TBNAME) + isTbname = true; + } + if (isTagCol) { SColumnNode* pSColumnNode = *(SColumnNode**)pNode; SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); @@ -316,24 +328,21 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { } nodesDestroyNode(*pNode); *pNode = (SNode*)res; - } else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) { - SFunctionNode* pFuncNode = *(SFunctionNode**)pNode; - if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) { - SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); - if (NULL == res) { - return DEAL_RES_ERROR; - } - - res->translate = true; - res->node.resType = pFuncNode->node.resType; - - int32_t len = strlen(mr->me.name); - res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1); - memcpy(varDataVal(res->datum.p), mr->me.name, len); - varDataSetLen(res->datum.p, len); - nodesDestroyNode(*pNode); - *pNode = (SNode*)res; + } else if (isTbname) { + SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + if (NULL == res) { + return DEAL_RES_ERROR; } + + res->translate = true; + res->node.resType = ((SExprNode*)(*pNode))->resType; + + int32_t len = strlen(mr->me.name); + res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1); + memcpy(varDataVal(res->datum.p), mr->me.name, len); + varDataSetLen(res->datum.p, len); + nodesDestroyNode(*pNode); + *pNode = (SNode*)res; } return DEAL_RES_CONTINUE; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 394eecd542..bcd1ab5c18 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2517,7 +2517,7 @@ static int32_t translateMd5(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN, .type = TSDB_DATA_TYPE_VARCHAR}; + pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index ae37334762..4faf93c734 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -6328,6 +6328,7 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew if (code == TSDB_CODE_SUCCESS) { code = tsmaOptRewriteNodeList(pNewScan->pGroupTags, pTsmaOptCtx, pTsma, true, true); } + pTsmaOptCtx->pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD; if (pTsmaOptCtx->pScan->pTsmaTargetTbVgInfo && pTsmaOptCtx->pScan->pTsmaTargetTbVgInfo->size > 0) { for (int32_t i = 0; i < taosArrayGetSize(pTsmaOptCtx->pScan->pTsmas); ++i) { STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmaOptCtx->pScan->pTsmas, i); diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 606faf6312..422c9a2f1d 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -1209,7 +1209,7 @@ class TDTestCase: self.test_ddl() self.test_query_with_tsma() # bug to fix - # self.test_flush_query() + self.test_flush_query() #cluster test cluster_dnode_list = tdSql.get_cluseter_dnodes() @@ -1231,14 +1231,22 @@ class TDTestCase: # self.test_drop_ctable() self.test_drop_db() - def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float): + def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float, is_expect_row = None): timeout = timeout_in_seconds tdSql.query(sql) - while timeout > 0 and tdSql.getRows() != expected_row_num: - tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {tdSql.getRows()}, remain: {timeout_in_seconds - timeout}') + rows: int = 0 + for row in tdSql.queryResult: + if is_expect_row is None or is_expect_row(row): + rows = rows + 1 + while timeout > 0 and rows != expected_row_num: + tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {str(tdSql.queryResult)} useful rows: {rows}, remain: {timeout_in_seconds - timeout}') time.sleep(1) timeout = timeout - 1 tdSql.query(sql) + rows = 0 + for row in tdSql.queryResult: + if is_expect_row is None or is_expect_row(row): + rows = rows + 1 if timeout <= 0: tdLog.exit(f'failed to wait query: {sql} to return {expected_row_num} rows timeout: {timeout_in_seconds}s') else: @@ -1255,7 +1263,7 @@ class TDTestCase: tdSql.error('drop tsma test.tsma1', -2147482491) tdSql.execute('drop tsma test.tsma2', queryTimes=1) tdSql.execute('drop tsma test.tsma1', queryTimes=1) - self.wait_query('show transactions', 0, 10) + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') tdSql.execute('drop database test', queryTimes=1) self.init_data() @@ -1296,7 +1304,7 @@ class TDTestCase: 'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096) tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1) - self.wait_query('show transactions', 0, 10) + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') tdSql.execute('drop database nsdb') # drop norm table @@ -1323,7 +1331,7 @@ class TDTestCase: # test drop stream tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first - self.wait_query('show transactions', 0, 10) + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') tdSql.execute('drop database test', queryTimes=1) self.init_data() @@ -1424,7 +1432,7 @@ class TDTestCase: tdSql.error( 'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097) - self.wait_query('show transactions', 0, 10) + self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo') tdSql.execute('drop database nsdb') def test_create_tsma_on_norm_table(self): @@ -1569,7 +1577,7 @@ class TDTestCase: tdSql.error('create tsma tsma_illegal on test.meters function(avg(c8)) interval(5m)',-2147473406) def test_flush_query(self): - tdSql.execute('insert into test.norm_tb (ts,c1_new,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1_new),avg(c2) from test.norm_tb interval(10m);select avg(c1_new),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1) + tdSql.execute('insert into test.norm_tb (ts,c1,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1),avg(c2) from test.norm_tb interval(10m);select avg(c1),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1) tdSql.execute('flush database test', queryTimes=1) tdSql.query('select count(*) from test.meters', queryTimes=1) tdSql.checkData(0,0,100000)