Merge pull request #25386 from taosdata/fix/3.0/TD-29631

fix cancel create tsma memory use after free
This commit is contained in:
dapan1121 2024-04-23 13:17:55 +08:00 committed by GitHub
commit 07e01b3e97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 69 additions and 30 deletions

View File

@ -538,7 +538,6 @@ void stopAllQueries(SRequestObj *pRequest) {
pTmp = acquireRequest(tmpRefId);
if (pTmp) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
@ -547,6 +546,7 @@ void stopAllQueries(SRequestObj *pRequest) {
for (int32_t i = reqIdx; i >= 0; i--) {
taosStopQueryImpl(pReqList[i]);
releaseRequest(pReqList[i]->self);
}
taosStopQueryImpl(pRequest);

View File

@ -1526,6 +1526,8 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
int32_t code = -1;
STransAction createStreamRedoAction = {0};
STransAction createStreamUndoAction = {0};
STransAction dropStbUndoAction = {0};
SMDropStbReq dropStbReq = {0};
STrans *pTrans =
mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma");
if (!pTrans) {
@ -1556,7 +1558,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
createStreamUndoAction.epSet = createStreamRedoAction.epSet;
createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
createStreamUndoAction.actionType = TDMT_STREAM_DROP;
createStreamUndoAction.msgType = TDMT_STREAM_DROP;
createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
if (!createStreamUndoAction.pCont) {
@ -1569,6 +1571,24 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
goto _OVER;
}
dropStbReq.igNotExists = true;
strncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
dropStbUndoAction.epSet = createStreamRedoAction.epSet;
dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
if (!dropStbUndoAction.pCont) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (dropStbUndoAction.contLen != tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
SDbObj newDb = {0};
memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
newDb.tsmaVersion++;
@ -1579,6 +1599,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
if (mndTransAppendRedoAction(pTrans, &createStreamRedoAction) != 0) goto _OVER;
if (mndTransAppendUndoAction(pTrans, &createStreamUndoAction) != 0) goto _OVER;
if (mndTransAppendUndoAction(pTrans, &dropStbUndoAction) != 0) goto _OVER;
if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER;
code = TSDB_CODE_SUCCESS;

View File

@ -1109,7 +1109,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
goto _OVER;
}
int32_t actionNum = taosArrayGetSize(pTrans->redoActions);
int32_t actionNum = taosArrayGetSize(pArray);
if (action < 0 || action >= actionNum) {
mError("trans:%d, invalid action:%d", transId, action);
goto _OVER;

View File

@ -287,7 +287,19 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
SMetaReader* mr = (SMetaReader*)pContext;
bool isTagCol = false, isTbname = false;
if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
SColumnNode* pCol = (SColumnNode*)*pNode;
if (pCol->colType == COLUMN_TYPE_TBNAME)
isTbname = true;
else
isTagCol = true;
} else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
SFunctionNode* pFunc = (SFunctionNode*)*pNode;
if (pFunc->funcType == FUNCTION_TYPE_TBNAME)
isTbname = true;
}
if (isTagCol) {
SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
@ -316,16 +328,14 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
}
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
} else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
SFunctionNode* pFuncNode = *(SFunctionNode**)pNode;
if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
} else if (isTbname) {
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
return DEAL_RES_ERROR;
}
res->translate = true;
res->node.resType = pFuncNode->node.resType;
res->node.resType = ((SExprNode*)(*pNode))->resType;
int32_t len = strlen(mr->me.name);
res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
@ -334,7 +344,6 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
}
}
return DEAL_RES_CONTINUE;
}

View File

@ -2517,7 +2517,7 @@ static int32_t translateMd5(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN, .type = TSDB_DATA_TYPE_VARCHAR};
pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR};
return TSDB_CODE_SUCCESS;
}

View File

@ -6332,6 +6332,7 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
if (code == TSDB_CODE_SUCCESS) {
code = tsmaOptRewriteNodeList(pNewScan->pGroupTags, pTsmaOptCtx, pTsma, true, true);
}
pTsmaOptCtx->pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
if (pTsmaOptCtx->pScan->pTsmaTargetTbVgInfo && pTsmaOptCtx->pScan->pTsmaTargetTbVgInfo->size > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pTsmaOptCtx->pScan->pTsmas); ++i) {
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmaOptCtx->pScan->pTsmas, i);

View File

@ -1209,7 +1209,7 @@ class TDTestCase:
self.test_ddl()
self.test_query_with_tsma()
# bug to fix
# self.test_flush_query()
self.test_flush_query()
#cluster test
cluster_dnode_list = tdSql.get_cluseter_dnodes()
@ -1231,14 +1231,22 @@ class TDTestCase:
# self.test_drop_ctable()
self.test_drop_db()
def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float):
def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float, is_expect_row = None):
timeout = timeout_in_seconds
tdSql.query(sql)
while timeout > 0 and tdSql.getRows() != expected_row_num:
tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {tdSql.getRows()}, remain: {timeout_in_seconds - timeout}')
rows: int = 0
for row in tdSql.queryResult:
if is_expect_row is None or is_expect_row(row):
rows = rows + 1
while timeout > 0 and rows != expected_row_num:
tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {str(tdSql.queryResult)} useful rows: {rows}, remain: {timeout_in_seconds - timeout}')
time.sleep(1)
timeout = timeout - 1
tdSql.query(sql)
rows = 0
for row in tdSql.queryResult:
if is_expect_row is None or is_expect_row(row):
rows = rows + 1
if timeout <= 0:
tdLog.exit(f'failed to wait query: {sql} to return {expected_row_num} rows timeout: {timeout_in_seconds}s')
else:
@ -1255,7 +1263,7 @@ class TDTestCase:
tdSql.error('drop tsma test.tsma1', -2147482491)
tdSql.execute('drop tsma test.tsma2', queryTimes=1)
tdSql.execute('drop tsma test.tsma1', queryTimes=1)
self.wait_query('show transactions', 0, 10)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
tdSql.execute('drop database test', queryTimes=1)
self.init_data()
@ -1296,7 +1304,7 @@ class TDTestCase:
'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096)
tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1)
self.wait_query('show transactions', 0, 10)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
tdSql.execute('drop database nsdb')
# drop norm table
@ -1323,7 +1331,7 @@ class TDTestCase:
# test drop stream
tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first
self.wait_query('show transactions', 0, 10)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
tdSql.execute('drop database test', queryTimes=1)
self.init_data()
@ -1424,7 +1432,7 @@ class TDTestCase:
tdSql.error(
'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
self.wait_query('show transactions', 0, 10)
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
tdSql.execute('drop database nsdb')
def test_create_tsma_on_norm_table(self):
@ -1569,7 +1577,7 @@ class TDTestCase:
tdSql.error('create tsma tsma_illegal on test.meters function(avg(c8)) interval(5m)',-2147473406)
def test_flush_query(self):
tdSql.execute('insert into test.norm_tb (ts,c1_new,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1_new),avg(c2) from test.norm_tb interval(10m);select avg(c1_new),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1)
tdSql.execute('insert into test.norm_tb (ts,c1,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1),avg(c2) from test.norm_tb interval(10m);select avg(c1),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1)
tdSql.execute('flush database test', queryTimes=1)
tdSql.query('select count(*) from test.meters', queryTimes=1)
tdSql.checkData(0,0,100000)