From 4a54ce26d5d842be7ce629ea0e39f87a35a0d965 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 18 Jul 2022 14:25:00 +0800 Subject: [PATCH 01/25] fix: fix sys table show stables issue --- source/libs/executor/src/scanoperator.c | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b80b9af237..f51dd076a7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2126,6 +2126,21 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { } } + +static SSDataBlock* sysTableScanUserSTables(SOperatorInfo* pOperator) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSysTableScanInfo* pInfo = pOperator->info; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + pInfo->pRes->info.rows = 0; + pOperator->status == OP_EXEC_DONE; + + pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; + return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; +} + static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { // build message and send to mnode to fetch the content of system tables. SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2136,6 +2151,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { return sysTableScanUserTables(pOperator); } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0) { return sysTableScanUserTags(pOperator); + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, TSDB_TABLE_FNAME_LEN) == 0) { + return sysTableScanUserSTables(pOperator); } else { // load the meta from mnode of the given epset if (pOperator->status == OP_EXEC_DONE) { return NULL; From 144405443793d90f4795f2844c30345c8e3a498a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 18 Jul 2022 18:57:47 +0800 Subject: [PATCH 02/25] fix: fix stmt memory leak --- source/client/src/clientStmt.c | 13 +++++++++---- source/libs/nodes/src/nodesUtilFuncs.c | 8 ++------ source/libs/parser/src/parInsert.c | 4 +++- source/libs/parser/src/parser.c | 6 ++++++ tests/script/api/batchprepare.c | 9 ++++----- 5 files changed, 24 insertions(+), 16 deletions(-) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index bf00965c7a..7a83006961 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -6,11 +6,16 @@ #include "clientStmt.h" static int32_t stmtCreateRequest(STscStmt* pStmt) { + int32_t code = 0; + if (pStmt->exec.pRequest == NULL) { - return buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest); - } else { - return TSDB_CODE_SUCCESS; + code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest); + if (TSDB_CODE_SUCCESS == code) { + pStmt->exec.pRequest->syncQuery = true; + } } + + return code; } int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) { @@ -227,7 +232,7 @@ int32_t stmtParseSql(STscStmt* pStmt) { }; STMT_ERR_RET(stmtCreateRequest(pStmt)); - + STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb)); pStmt->bInfo.needParse = false; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 1dc3db033b..c96dc194ca 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -388,11 +388,6 @@ static void destroyDataSinkNode(SDataSinkNode* pNode) { nodesDestroyNode((SNode* static void destroyExprNode(SExprNode* pExpr) { taosArrayDestroy(pExpr->pAssociation); } -static void nodesDestroyNodePointer(void* node) { - SNode* pNode = *(SNode**)node; - nodesDestroyNode(pNode); -} - void nodesDestroyNode(SNode* pNode) { if (NULL == pNode) { return; @@ -716,6 +711,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_QUERY: { SQuery* pQuery = (SQuery*)pNode; nodesDestroyNode(pQuery->pRoot); + nodesDestroyNode(pQuery->pPrepareRoot); taosMemoryFreeClear(pQuery->pResSchema); if (NULL != pQuery->pCmdMsg) { taosMemoryFreeClear(pQuery->pCmdMsg->pMsg); @@ -723,7 +719,7 @@ void nodesDestroyNode(SNode* pNode) { } taosArrayDestroy(pQuery->pDbList); taosArrayDestroy(pQuery->pTableList); - taosArrayDestroyEx(pQuery->pPlaceholderValues, nodesDestroyNodePointer); + taosArrayDestroy(pQuery->pPlaceholderValues); break; } case QUERY_NODE_LOGIC_PLAN_SCAN: { diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 5ac4476d14..3defd0224b 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1497,7 +1497,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { memset(&pCxt->tags, 0, sizeof(pCxt->tags)); pCxt->pVgroupsHashObj = NULL; pCxt->pTableBlockHashObj = NULL; - pCxt->pTableMeta = NULL; return TSDB_CODE_SUCCESS; } @@ -1554,7 +1553,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache if (NULL == *pQuery) { return TSDB_CODE_OUT_OF_MEMORY; } + } else { + nodesDestroyNode((*pQuery)->pRoot); } + (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE; (*pQuery)->haveResultSet = false; (*pQuery)->msgType = TDMT_VND_SUBMIT; diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index fdba0e2fcc..78d1e83436 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -82,11 +82,16 @@ static int32_t parseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, SParseMetaCa } static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) { + if (IS_VAR_DATA_TYPE(pVal->node.resType.type)) { + taosMemoryFreeClear(pVal->datum.p); + } + if (pParam->is_null && 1 == *(pParam->is_null)) { pVal->node.resType.type = TSDB_DATA_TYPE_NULL; pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes; return TSDB_CODE_SUCCESS; } + int32_t inputSize = (NULL != pParam->length ? *(pParam->length) : tDataTypes[pParam->buffer_type].bytes); pVal->node.resType.type = pParam->buffer_type; pVal->node.resType.bytes = inputSize; @@ -239,6 +244,7 @@ int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx } if (TSDB_CODE_SUCCESS == code && (colIdx < 0 || colIdx + 1 == pQuery->placeholderNum)) { + nodesDestroyNode(pQuery->pRoot); pQuery->pRoot = nodesCloneNode(pQuery->pPrepareRoot); if (NULL == pQuery->pRoot) { code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 29c1fdb015..a330b6416e 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -218,7 +218,7 @@ typedef struct { } CaseCtrl; #if 0 -CaseCtrl gCaseCtrl = { // default +CaseCtrl gCaseCtrl = { .precision = TIME_PRECISION_MICRO, .bindNullNum = 0, .printCreateTblSql = false, @@ -251,7 +251,7 @@ CaseCtrl gCaseCtrl = { // default #if 1 -CaseCtrl gCaseCtrl = { +CaseCtrl gCaseCtrl = { // default .precision = TIME_PRECISION_MILLI, .bindNullNum = 0, .printCreateTblSql = false, @@ -2596,6 +2596,8 @@ void runAll(TAOS *taos) { printf("%s Begin\n", gCaseCtrl.caseCatalog); runCaseList(taos); +#if 0 + strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); gCaseCtrl.precision = TIME_PRECISION_MICRO; @@ -2626,7 +2628,6 @@ void runAll(TAOS *taos) { runCaseList(taos); gCaseCtrl.bindRowNum = 0; -#if 0 strcpy(gCaseCtrl.caseCatalog, "Row Num Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); gCaseCtrl.rowNum = 1000; @@ -2640,7 +2641,6 @@ void runAll(TAOS *taos) { gCaseCtrl.runTimes = 2; runCaseList(taos); gCaseCtrl.runTimes = 0; -#endif strcpy(gCaseCtrl.caseCatalog, "Check Param Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); @@ -2648,7 +2648,6 @@ void runAll(TAOS *taos) { runCaseList(taos); gCaseCtrl.checkParamNum = false; -#if 0 strcpy(gCaseCtrl.caseCatalog, "Bind Col Num Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); gCaseCtrl.bindColNum = 6; From 3e2ded3973917931cc84157c29884d3610fe5cf3 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 19 Jul 2022 10:12:02 +0800 Subject: [PATCH 03/25] fix: add debug info --- source/libs/executor/src/scanoperator.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 200b920a0e..91aefd3406 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2020,8 +2020,8 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; int32_t code = metaGetTableEntryByUid(&mr, suid); if (code != TSDB_CODE_SUCCESS) { - qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), - GET_TASKID(pTaskInfo)); + qError("failed to get super table meta, cname:%s, suid:0x%" PRIx64 ", code:%s, %s", + pInfo->pCur->mr.me.name, suid, tstrerror(terrno), GET_TASKID(pTaskInfo)); metaReaderClear(&mr); metaCloseTbCursor(pInfo->pCur); pInfo->pCur = NULL; From 80808766c17d30c25aa7f5febb903cb5057d2d53 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 19 Jul 2022 15:17:43 +0800 Subject: [PATCH 04/25] fix: fix memory leak --- source/client/inc/clientStmt.h | 2 +- source/client/src/clientStmt.c | 16 ++++++++++------ source/libs/parser/src/parInsertData.c | 1 + 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index c2b5d1de6f..a7adaef966 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -72,7 +72,6 @@ typedef struct SStmtBindInfo { typedef struct SStmtExecInfo { int32_t affectedRows; SRequestObj* pRequest; - SHashObj* pVgHash; SHashObj* pBlockHash; bool autoCreateTbl; } SStmtExecInfo; @@ -88,6 +87,7 @@ typedef struct SStmtSQLInfo { SArray* nodeList; SStmtQueryResInfo queryRes; bool autoCreateTbl; + SHashObj* pVgHash; } SStmtSQLInfo; typedef struct STscStmt { diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 7a83006961..70edb32f2d 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -160,7 +160,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash, bool autoCreateTbl) { STscStmt* pStmt = (STscStmt*)stmt; - pStmt->exec.pVgHash = pVgHash; + pStmt->sql.pVgHash = pVgHash; pStmt->exec.pBlockHash = pBlockHash; pStmt->exec.autoCreateTbl = autoCreateTbl; @@ -182,7 +182,7 @@ int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) { STscStmt* pStmt = (STscStmt*)stmt; - *pVgHash = pStmt->exec.pVgHash; + *pVgHash = pStmt->sql.pVgHash; *pBlockHash = pStmt->exec.pBlockHash; return TSDB_CODE_SUCCESS; @@ -313,6 +313,8 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { taosMemoryFree(pStmt->sql.sqlStr); qDestroyQuery(pStmt->sql.pQuery); taosArrayDestroy(pStmt->sql.nodeList); + taosHashCleanup(pStmt->sql.pVgHash); + pStmt->sql.pVgHash = NULL; void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL); while (pIter) { @@ -345,7 +347,7 @@ int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks* pDataBlock, STab STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo)); STMT_ERR_RET( - taosHashPut(pStmt->exec.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo))); + taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo))); STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId)); @@ -685,6 +687,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { if (pStmt->sql.pQuery->haveResultSet) { setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema, pStmt->sql.pQuery->numOfResCols); + taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema); setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision); } @@ -809,7 +812,7 @@ int stmtExec(TAOS_STMT* stmt) { if (STMT_TYPE_QUERY == pStmt->sql.type) { launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); } else { - STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash)); + STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash)); launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, (autoCreateTbl ? (void**)&pRsp : NULL)); } @@ -852,9 +855,10 @@ _return: int stmtClose(TAOS_STMT* stmt) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_RET(stmtCleanSQLInfo(pStmt)); - + stmtCleanSQLInfo(pStmt); taosMemoryFree(stmt); + + return TSDB_CODE_SUCCESS; } const char* stmtErrstr(TAOS_STMT* stmt) { diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index 290c65de12..9e1d8dba8b 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -678,6 +678,7 @@ void qFreeStmtDataBlock(void* pDataBlock) { return; } + taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pTableMeta); taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData); taosMemoryFreeClear(pDataBlock); } From d2e780b14e778989dd54ab28c8b2381a998a6d92 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 19 Jul 2022 16:47:53 +0800 Subject: [PATCH 05/25] fix: fix memory leak --- source/libs/transport/src/trans.c | 2 +- source/libs/transport/src/transComm.c | 3 ++- tests/script/api/batchprepare.c | 4 +++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 725f3b32cf..79f2f17a6e 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -112,7 +112,7 @@ void* rpcMallocCont(int32_t contLen) { void rpcFreeCont(void* cont) { if (cont == NULL) return; taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD); - tTrace("free mem:%p", (char*)cont - TRANS_MSG_OVERHEAD); + tTrace("rpc free cont:%p", (char*)cont - TRANS_MSG_OVERHEAD); } void* rpcReallocCont(void* ptr, int32_t contLen) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 84af8da513..0e1e4c6040 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -124,6 +124,7 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { SConnBuffer* p = connBuf; if (p->cap == 0) { p->buf = (char*)taosMemoryCalloc(CAPACITY, sizeof(char)); + tTrace("internal malloc mem:%p, size:%d", p->buf, CAPACITY); p->len = 0; p->cap = CAPACITY; p->total = -1; @@ -136,7 +137,7 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { } else { p->cap = p->total; p->buf = taosMemoryRealloc(p->buf, p->cap); - tTrace("internal malloc mem:%p, size:%d", p->buf, p->cap); + tTrace("internal realloc mem:%p, size:%d", p->buf, p->cap); uvBuf->base = p->buf + p->len; uvBuf->len = p->cap - p->len; diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index a330b6416e..e89a8b33eb 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -328,7 +328,7 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper //.optrIdxList = optrIdxList, //.bindColTypeNum = tListLen(bindColTypeList), //.bindColTypeList = bindColTypeList, - .caseIdx = 24, + .caseIdx = 8, .caseNum = 1, .caseRunNum = 1, }; @@ -1384,6 +1384,7 @@ void bpCheckTagFields(TAOS_STMT *stmt, TAOS_MULTI_BIND* pBind) { } bpCheckColTagFields(stmt, fieldNum, pFields, gCurCase->bindTagNum, pBind, BP_BIND_TAG); + taosMemoryFree(pFields); } void bpCheckColFields(TAOS_STMT *stmt, TAOS_MULTI_BIND* pBind) { @@ -1401,6 +1402,7 @@ void bpCheckColFields(TAOS_STMT *stmt, TAOS_MULTI_BIND* pBind) { } bpCheckColTagFields(stmt, fieldNum, pFields, gCurCase->bindColNum, pBind, BP_BIND_COL); + taosMemoryFree(pFields); } void bpShowBindParam(TAOS_MULTI_BIND *bind, int32_t num) { From c0a21dc911978a0421751bf25fd688b6c16665f0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 19 Jul 2022 16:56:37 +0800 Subject: [PATCH 06/25] avoid mem leak --- source/libs/transport/src/transCli.c | 3 +++ source/libs/transport/src/transComm.c | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index be3111e870..5d63c8daf6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -198,6 +198,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } \ destroyCmsg(pMsg); \ cliReleaseUnfinishedMsg(conn); \ + transQueueClear(&conn->cliMsgs); \ addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \ return; \ } \ @@ -545,6 +546,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { STrans* pTransInst = thrd->pTransInst; conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); + cliReleaseUnfinishedMsg(conn); transQueueClear(&conn->cliMsgs); transCtxCleanup(&conn->ctx); conn->status = ConnInPool; @@ -645,6 +647,7 @@ static void cliDestroy(uv_handle_t* handle) { conn->stream->data = NULL; taosMemoryFree(conn->stream); transCtxCleanup(&conn->ctx); + cliReleaseUnfinishedMsg(conn); transQueueDestroy(&conn->cliMsgs); tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); transReqQueueClear(&conn->wreqQueue); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 84af8da513..e74f64faae 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -240,7 +240,7 @@ void transCtxCleanup(STransCtx* ctx) { ctx->freeFunc(iter->val); iter = taosHashIterate(ctx->args, iter); } - + ctx->freeFunc(ctx->brokenVal.val); taosHashCleanup(ctx->args); ctx->args = NULL; } From d266bfe1875e758bcb33a23243fb69d1b022dcb0 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 20 Jul 2022 08:39:52 +0800 Subject: [PATCH 07/25] fix: fix stmt memory leak --- include/os/osSysinfo.h | 2 +- source/dnode/mgmt/exe/dmMain.c | 4 ++-- source/libs/function/src/udfd.c | 4 ++-- source/os/src/osSysinfo.c | 2 +- tests/script/api/batchprepare.c | 9 +++++---- 5 files changed, 11 insertions(+), 10 deletions(-) diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 4ec2e2884e..6eed31b5e9 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -33,7 +33,7 @@ typedef struct { SDiskSize size; } SDiskSpace; -bool taosCheckSystemIsSmallEnd(); +bool taosCheckSystemIsLittleEnd(); void taosGetSystemInfo(); int32_t taosGetEmail(char *email, int32_t maxLen); int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen); diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index 00c32e1990..013cc05c65 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -158,8 +158,8 @@ static void taosCleanupArgs() { } int main(int argc, char const *argv[]) { - if (!taosCheckSystemIsSmallEnd()) { - printf("failed to start since on non-small-end machines\n"); + if (!taosCheckSystemIsLittleEnd()) { + printf("failed to start since on non-little-end machines\n"); return -1; } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 74fca69aa7..2402607251 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -913,8 +913,8 @@ void udfdConnectMnodeThreadFunc(void *args) { } int main(int argc, char *argv[]) { - if (!taosCheckSystemIsSmallEnd()) { - printf("failed to start since on non-small-end machines\n"); + if (!taosCheckSystemIsLittleEnd()) { + printf("failed to start since on non-little-end machines\n"); return -1; } diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index b6220b0ae8..8450e8baea 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -210,7 +210,7 @@ static int32_t taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) { } -bool taosCheckSystemIsSmallEnd() { +bool taosCheckSystemIsLittleEnd() { union check { int16_t i; char ch[2]; diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index e89a8b33eb..ada2039460 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -299,7 +299,7 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper .printRes = true, .runTimes = 0, .caseRunIdx = -1, - .caseIdx = 23, + .caseIdx = 5, .caseNum = 1, .caseRunNum = 1, }; @@ -1408,7 +1408,7 @@ void bpCheckColFields(TAOS_STMT *stmt, TAOS_MULTI_BIND* pBind) { void bpShowBindParam(TAOS_MULTI_BIND *bind, int32_t num) { for (int32_t i = 0; i < num; ++i) { TAOS_MULTI_BIND* b = &bind[i]; - printf("Bind %d: type[%d],buf[%p],buflen[%d],len[%],null[%d],num[%d]\n", + printf("Bind %d: type[%d],buf[%p],buflen[%d],len[%d],null[%d],num[%d]\n", i, b->buffer_type, b->buffer, b->buffer_length, b->length ? *b->length : 0, b->is_null ? *b->is_null : 0, b->num); } } @@ -2599,7 +2599,6 @@ void runAll(TAOS *taos) { runCaseList(taos); #if 0 - strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); gCaseCtrl.precision = TIME_PRECISION_MICRO; @@ -2655,13 +2654,15 @@ void runAll(TAOS *taos) { gCaseCtrl.bindColNum = 6; runCaseList(taos); gCaseCtrl.bindColNum = 0; +#endif +/* strcpy(gCaseCtrl.caseCatalog, "Bind Col Type Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); gCaseCtrl.bindColTypeNum = tListLen(bindColTypeList); gCaseCtrl.bindColTypeList = bindColTypeList; runCaseList(taos); -#endif +*/ printf("All Test End\n"); } From 5ed719c727241cdc6b0e36d641b514738043371c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 20 Jul 2022 09:39:58 +0800 Subject: [PATCH 08/25] fix: fix catalog memory leak --- include/common/tname.h | 1 + include/libs/qcom/query.h | 2 ++ source/common/src/tname.c | 5 +++++ source/libs/catalog/inc/catalogInt.h | 2 -- source/libs/catalog/src/catalog.c | 6 +++--- source/libs/catalog/src/ctgCache.c | 14 ++++++------- source/libs/catalog/src/ctgRemote.c | 26 +++++++++++++++++++++++++ source/libs/executor/src/scanoperator.c | 3 ++- 8 files changed, 46 insertions(+), 13 deletions(-) diff --git a/include/common/tname.h b/include/common/tname.h index 77965947ad..89c7764404 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -50,6 +50,7 @@ bool tNameIsValid(const SName* name); const char* tNameGetTableName(const SName* name); int32_t tNameGetDbName(const SName* name, char* dst); +const char* tNameGetDbNameP(const SName* name); int32_t tNameGetFullDbName(const SName* name, char* dst); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 58739b4af7..4efcc9031b 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -260,6 +260,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define REQUEST_TOTAL_EXEC_TIMES 2 +#define IS_SYS_DBNAME(_dbname) (((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || ((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB)))) + #define qFatal(...) \ do { \ if (qDebugFlag & DEBUG_FATAL) { \ diff --git a/source/common/src/tname.c b/source/common/src/tname.c index 7183153824..c5bebf3630 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -190,6 +190,11 @@ int32_t tNameGetDbName(const SName* name, char* dst) { return 0; } +const char* tNameGetDbNameP(const SName* name) { + return &name->dbname[0]; +} + + int32_t tNameGetFullDbName(const SName* name, char* dst) { assert(name != NULL && dst != NULL); snprintf(dst, TSDB_DB_FNAME_LEN, "%d.%s", name->acctId, name->dbname); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9003de97d7..bf3bc1f0f4 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -460,8 +460,6 @@ typedef struct SCtgOperation { #define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)) #define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE)) -#define CTG_IS_SYS_DBNAME(_dbname) (((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || ((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB)))) - #define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) #define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 1b7f53ae67..59f11898fa 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -865,7 +865,7 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo *pConn, SArray* tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - if (CTG_IS_SYS_DBNAME(name.dbname)) { + if (IS_SYS_DBNAME(name.dbname)) { continue; } @@ -936,7 +936,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - if (CTG_IS_SYS_DBNAME(pTableName->dbname)) { + if (IS_SYS_DBNAME(pTableName->dbname)) { ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -947,7 +947,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const int32_t catalogGetTableHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup) { CTG_API_ENTER(); - if (CTG_IS_SYS_DBNAME(pTableName->dbname)) { + if (IS_SYS_DBNAME(pTableName->dbname)) { ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 499ce77276..06e8216e87 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -132,7 +132,7 @@ void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) { int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) { char *p = strchr(dbFName, '.'); - if (p && CTG_IS_SYS_DBNAME(p + 1)) { + if (p && IS_SYS_DBNAME(p + 1)) { dbFName = p + 1; } @@ -694,7 +694,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) } char *p = strchr(dbFName, '.'); - if (p && CTG_IS_SYS_DBNAME(p + 1)) { + if (p && IS_SYS_DBNAME(p + 1)) { dbFName = p + 1; } @@ -727,7 +727,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) } char *p = strchr(dbFName, '.'); - if (p && CTG_IS_SYS_DBNAME(p + 1)) { + if (p && IS_SYS_DBNAME(p + 1)) { dbFName = p + 1; } @@ -823,7 +823,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId } char *p = strchr(dbFName, '.'); - if (p && CTG_IS_SYS_DBNAME(p + 1)) { + if (p && IS_SYS_DBNAME(p + 1)) { dbFName = p + 1; } @@ -859,7 +859,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy } char *p = strchr(output->dbFName, '.'); - if (p && CTG_IS_SYS_DBNAME(p + 1)) { + if (p && IS_SYS_DBNAME(p + 1)) { memmove(output->dbFName, p + 1, strlen(p + 1)); } @@ -2123,7 +2123,7 @@ int32_t ctgStartUpdateThread() { int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) { - if (CTG_IS_SYS_DBNAME(ctx->pName->dbname)) { + if (IS_SYS_DBNAME(ctx->pName->dbname)) { CTG_FLAG_SET_SYS_DB(ctx->flag); } @@ -2177,7 +2177,7 @@ _return: } int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup) { - if (CTG_IS_SYS_DBNAME(pTableName->dbname)) { + if (IS_SYS_DBNAME(pTableName->dbname)) { ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 1e375471f9..cc5dde9298 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -375,6 +375,8 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL)); + rpcFreeCont(rpcRsp.pCont); + return TSDB_CODE_SUCCESS; } @@ -408,6 +410,8 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL)); + rpcFreeCont(rpcRsp.pCont); + return TSDB_CODE_SUCCESS; } @@ -447,6 +451,8 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db)); + rpcFreeCont(rpcRsp.pCont); + return TSDB_CODE_SUCCESS; } @@ -485,6 +491,8 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)dbFName)); + rpcFreeCont(rpcRsp.pCont); + return TSDB_CODE_SUCCESS; } @@ -522,6 +530,8 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName)); + + rpcFreeCont(rpcRsp.pCont); return TSDB_CODE_SUCCESS; } @@ -563,6 +573,8 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); + + rpcFreeCont(rpcRsp.pCont); return TSDB_CODE_SUCCESS; } @@ -602,6 +614,8 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)funcName)); + rpcFreeCont(rpcRsp.pCont); + return TSDB_CODE_SUCCESS; } @@ -639,6 +653,8 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user)); + + rpcFreeCont(rpcRsp.pCont); return TSDB_CODE_SUCCESS; } @@ -683,6 +699,8 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName)); + rpcFreeCont(rpcRsp.pCont); + return TSDB_CODE_SUCCESS; } @@ -740,6 +758,8 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName)); + rpcFreeCont(rpcRsp.pCont); + return TSDB_CODE_SUCCESS; } @@ -784,6 +804,8 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); + + rpcFreeCont(rpcRsp.pCont); return TSDB_CODE_SUCCESS; } @@ -824,6 +846,8 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); + + rpcFreeCont(rpcRsp.pCont); return TSDB_CODE_SUCCESS; } @@ -858,6 +882,8 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp); CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL)); + + rpcFreeCont(rpcRsp.pCont); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index bc526f8a31..48e6f51e42 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2147,11 +2147,12 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { SSysTableScanInfo* pInfo = pOperator->info; const char* name = tNameGetTableName(&pInfo->name); + const char* dbName = tNameGetDbNameP(&pInfo->name); if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { return sysTableScanUserTables(pOperator); } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0) { return sysTableScanUserTags(pOperator); - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, TSDB_TABLE_FNAME_LEN) == 0) { + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && IS_SYS_DBNAME(dbName)) { return sysTableScanUserSTables(pOperator); } else { // load the meta from mnode of the given epset if (pOperator->status == OP_EXEC_DONE) { From d6afa5e2c7137a5d05fb9830047d2150bb338a45 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 20 Jul 2022 09:46:53 +0800 Subject: [PATCH 09/25] fix: fix show stables issue --- source/libs/executor/src/scanoperator.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 48e6f51e42..6dc998e078 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2147,12 +2147,17 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { SSysTableScanInfo* pInfo = pOperator->info; const char* name = tNameGetTableName(&pInfo->name); - const char* dbName = tNameGetDbNameP(&pInfo->name); + if (pInfo->showRewrite) { + char dbName[TSDB_DB_NAME_LEN] = {0}; + getDBNameFromCondition(pInfo->pCondition, dbName); + sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); + } + if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { return sysTableScanUserTables(pOperator); } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0) { return sysTableScanUserTags(pOperator); - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && IS_SYS_DBNAME(dbName)) { + } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && IS_SYS_DBNAME(pInfo->req.db)) { return sysTableScanUserSTables(pOperator); } else { // load the meta from mnode of the given epset if (pOperator->status == OP_EXEC_DONE) { @@ -2164,12 +2169,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); strcpy(pInfo->req.user, pInfo->pUser); - if (pInfo->showRewrite) { - char dbName[TSDB_DB_NAME_LEN] = {0}; - getDBNameFromCondition(pInfo->pCondition, dbName); - sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); - } - int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); char* buf1 = taosMemoryCalloc(1, contLen); tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); From e477538f78398327ff64e7c5e8f539d4da15abf9 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 20 Jul 2022 09:50:42 +0800 Subject: [PATCH 10/25] fix: keep one case --- tests/script/api/batchprepare.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index ada2039460..e1aa1991a4 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -250,7 +250,7 @@ CaseCtrl gCaseCtrl = { #endif -#if 1 +#if 0 CaseCtrl gCaseCtrl = { // default .precision = TIME_PRECISION_MILLI, .bindNullNum = 0, @@ -282,7 +282,7 @@ CaseCtrl gCaseCtrl = { // default }; #endif -#if 0 +#if 1 CaseCtrl gCaseCtrl = { // query case with specified col&oper .bindNullNum = 1, .printCreateTblSql = false, From d60905aeb327c96a9f6182e4be434627ed60bfa6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 20 Jul 2022 10:01:29 +0800 Subject: [PATCH 11/25] fix: fix mem leak --- source/client/src/clientImpl.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 2ddf843a08..c83b80dcfc 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -621,6 +621,7 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* } taosArrayPush(pDbVgList, &pVgList); + taosArrayDestroy(pVgList); } } From 4d18894657a32e27af0bd7a0c41636b3afb68323 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 20 Jul 2022 13:30:37 +0800 Subject: [PATCH 12/25] fix: fix mem leak --- source/client/src/clientImpl.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c83b80dcfc..ee78338a4e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -590,6 +590,11 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray return code; } +void freeVgList(void *list) { + SArray* pList = *(SArray**)list; + taosArrayDestroy(pList); +} + int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) { SArray* pDbVgList = NULL; SArray* pQnodeList = NULL; @@ -621,7 +626,6 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* } taosArrayPush(pDbVgList, &pVgList); - taosArrayDestroy(pVgList); } } @@ -642,7 +646,7 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* _return: - taosArrayDestroy(pDbVgList); + taosArrayDestroyEx(pDbVgList, freeVgList); taosArrayDestroy(pQnodeList); return code; From 38431432acb1f0cc22cc9725905ea88e47af7151 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 20 Jul 2022 13:48:14 +0800 Subject: [PATCH 13/25] fix: avoid rpc mem leak --- source/libs/transport/src/transCli.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5d63c8daf6..efb2434779 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1243,7 +1243,10 @@ void transReleaseCliHandle(void* handle) { void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) return; + if (pTransInst == NULL) { + transFreeMsg(pReq->pCont); + return; + } bool valid = false; SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid); @@ -1282,7 +1285,10 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) return; + if (pTransInst == NULL) { + transFreeMsg(pReq->pCont); + return; + } bool valid = false; SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid); From 7a62409dc171dd1afeb174c3dd0e38022b984157 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 20 Jul 2022 13:54:06 +0800 Subject: [PATCH 14/25] fix: fix taosd mem leak --- source/libs/executor/src/executorimpl.c | 6 +++--- source/libs/executor/src/scanoperator.c | 1 + source/libs/index/src/indexFilter.c | 2 ++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d3a7fc51eb..885c6d7458 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4284,7 +4284,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, REPLACE_NODE(pNew); } else { taosMemoryFree(keyBuf); - nodesClearList(groupNew); + nodesDestroyList(groupNew); metaReaderClear(&mr); return code; } @@ -4302,7 +4302,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, if (tTagIsJson(data)) { terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; taosMemoryFree(keyBuf); - nodesClearList(groupNew); + nodesDestroyList(groupNew); metaReaderClear(&mr); return terrno; } @@ -4325,7 +4325,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, info->groupId = groupId; groupNum++; - nodesClearList(groupNew); + nodesDestroyList(groupNew); metaReaderClear(&mr); } taosMemoryFree(keyBuf); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6dc998e078..c78788aa5c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -359,6 +359,7 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p SScalarParam param = {.columnData = pColInfoData}; fpSet.process(&srcParam, 1, ¶m); + colDataDestroy(&infoData); } static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index eadccba35f..27c90af3e7 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -707,6 +707,8 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { sifFreeParam(res); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); } + sifFreeRes(ctx.pRes); + SIF_RET(code); } From 31cfa1fa5b55fbc5e0f71c40da523543cd45c546 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 20 Jul 2022 14:14:46 +0800 Subject: [PATCH 15/25] fix: avoid rpc mem leak --- include/libs/transport/trpc.h | 18 +++++++--------- source/libs/transport/inc/transComm.h | 14 ++++++------ source/libs/transport/src/trans.c | 31 +++++++++++---------------- source/libs/transport/src/transCli.c | 29 ++++++++++++++----------- source/libs/transport/src/transSvr.c | 23 ++++++++++---------- 5 files changed, 56 insertions(+), 59 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 2ae1f7b854..50f9959177 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -124,18 +124,16 @@ void *rpcReallocCont(void *ptr, int32_t contLen); // Because taosd supports multi-process mode // These functions should not be used on the server side // Please use tmsg functions, which are defined in tmsgcb.h -void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); -void rpcSendResponse(const SRpcMsg *pMsg); -void rpcRegisterBrokenLinkArg(SRpcMsg *msg); -void rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock +int rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +int rpcSendResponse(const SRpcMsg *pMsg); +int rpcRegisterBrokenLinkArg(SRpcMsg *msg); +int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock // These functions will not be called in the child process -void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); -void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); -int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); -void* rpcAllocHandle(); +int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); +void *rpcAllocHandle(); #ifdef __cplusplus } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 2972f512f1..8af3c8b7fd 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -289,14 +289,14 @@ void transUnrefSrvHandle(void* handle); void transRefCliHandle(void* handle); void transUnrefCliHandle(void* handle); -void transReleaseCliHandle(void* handle); -void transReleaseSrvHandle(void* handle); +int transReleaseCliHandle(void* handle); +int transReleaseSrvHandle(void* handle); -void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); -void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); -void transSendResponse(const STransMsg* msg); -void transRegisterMsg(const STransMsg* msg); -void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); +int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); +int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); +int transSendResponse(const STransMsg* msg); +int transRegisterMsg(const STransMsg* msg); +int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); int64_t transAllocHandle(); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 79f2f17a6e..7633820292 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -25,7 +25,7 @@ void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle}; -void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle}; +int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle}; static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { *ip = taosGetIpv4FromFqdn(localFqdn); @@ -129,25 +129,20 @@ void* rpcReallocCont(void* ptr, int32_t contLen) { return st + TRANS_MSG_OVERHEAD; } -void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) { - // deprecated api - assert(0); -} - int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; } void rpcCancelRequest(int64_t rid) { return; } -void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { - transSendRequest(shandle, pEpSet, pMsg, NULL); +int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { + return transSendRequest(shandle, pEpSet, pMsg, NULL); } -void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { - transSendRequest(shandle, pEpSet, pMsg, pCtx); +int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { + return transSendRequest(shandle, pEpSet, pMsg, pCtx); } -void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { - transSendRecv(shandle, pEpSet, pMsg, pRsp); +int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { + return transSendRecv(shandle, pEpSet, pMsg, pRsp); } -void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } +int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); } void rpcRefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); @@ -159,15 +154,15 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } -void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { transRegisterMsg(msg); } -void rpcReleaseHandle(void* handle, int8_t type) { +int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); } +int rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); - (*transReleaseHandle[type])(handle); + return (*transReleaseHandle[type])(handle); } -void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { +int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { // later - transSetDefaultAddr(thandle, ip, fqdn); + return transSetDefaultAddr(thandle, ip, fqdn); } void* rpcAllocHandle() { return (void*)transAllocHandle(); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index efb2434779..0a4b7ed9ab 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1224,13 +1224,13 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle, bool* validHandle) { } return pThrd; } -void transReleaseCliHandle(void* handle) { +int transReleaseCliHandle(void* handle) { int idx = -1; bool valid = false; SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle, &valid); if (pThrd == NULL) { - return; + return -1; } STransMsg tmsg = {.info.handle = handle}; SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); @@ -1238,14 +1238,14 @@ void transReleaseCliHandle(void* handle) { cmsg->type = Release; transAsyncSend(pThrd->asyncPool, &cmsg->q); - return; + return 0; } -void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { +int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - return; + return -1; } bool valid = false; @@ -1253,7 +1253,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra if (pThrd == NULL && valid == false) { transFreeMsg(pReq->pCont); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return; + return -1; } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); @@ -1280,14 +1280,14 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return; + return 0; } -void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { +int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - return; + return -1; } bool valid = false; @@ -1295,7 +1295,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM if (pThrd == NULL && valid == false) { transFreeMsg(pReq->pCont); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return; + return -1; } tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); @@ -1328,14 +1328,16 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM taosMemoryFree(sem); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return; + return 0; } /* * **/ -void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { +int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) return; + if (pTransInst == NULL) { + return -1; + } SCvtAddr cvtAddr = {0}; if (ip != NULL && fqdn != NULL) { @@ -1358,6 +1360,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { transAsyncSend(thrd->asyncPool, &(cliMsg->q)); } transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return 0; } int64_t transAllocHandle() { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 9a511adf9b..7b9402f954 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1034,7 +1034,7 @@ void transUnrefSrvHandle(void* handle) { } } -void transReleaseSrvHandle(void* handle) { +int transReleaseSrvHandle(void* handle) { SRpcHandleInfo* info = handle; SExHandle* exh = info->handle; int64_t refId = info->refId; @@ -1053,16 +1053,16 @@ void transReleaseSrvHandle(void* handle) { tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); transReleaseExHandle(transGetRefMgt(), refId); - return; + return 0; _return1: tTrace("handle %p failed to send to release handle", exh); transReleaseExHandle(transGetRefMgt(), refId); - return; + return -1; _return2: tTrace("handle %p failed to send to release handle", exh); - return; + return -1; } -void transSendResponse(const STransMsg* msg) { +int transSendResponse(const STransMsg* msg) { SExHandle* exh = msg->info.handle; int64_t refId = msg->info.refId; ASYNC_CHECK_HANDLE(exh, refId); @@ -1082,18 +1082,18 @@ void transSendResponse(const STransMsg* msg) { tGTrace("conn %p start to send resp (1/2)", exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); transReleaseExHandle(transGetRefMgt(), refId); - return; + return 0; _return1: tTrace("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); transReleaseExHandle(transGetRefMgt(), refId); - return; + return -1; _return2: tTrace("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); - return; + return -1; } -void transRegisterMsg(const STransMsg* msg) { +int transRegisterMsg(const STransMsg* msg) { SExHandle* exh = msg->info.handle; int64_t refId = msg->info.refId; ASYNC_CHECK_HANDLE(exh, refId); @@ -1112,16 +1112,17 @@ void transRegisterMsg(const STransMsg* msg) { tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); transReleaseExHandle(transGetRefMgt(), refId); - return; + return 0; _return1: tTrace("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); transReleaseExHandle(transGetRefMgt(), refId); - return; + return -1; _return2: tTrace("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); + return -1; } int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; } From c764ffb14fcb895744f094ada01f77b034444746 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 20 Jul 2022 15:22:56 +0800 Subject: [PATCH 16/25] fix rpc mem leak --- source/client/src/clientEnv.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 53a1bd2235..635d4bf2f9 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -88,7 +88,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) { static bool clientRpcRfp(int32_t code, tmsg_t msgType) { if (NEED_REDIRECT_ERROR(code)) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || - msgType == TDMT_SCH_MERGE_FETCH) { + msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT) { return false; } return true; From 85684cebb8e0f0b2215e17944570e0751f021add Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 20 Jul 2022 18:48:27 +0800 Subject: [PATCH 17/25] fix: fix mem leak --- include/libs/qcom/query.h | 4 ++-- source/libs/qcom/src/queryUtil.c | 13 ++++++++----- source/libs/scheduler/src/schRemote.c | 2 +- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 4efcc9031b..cc040594b1 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -193,7 +193,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); void destroySendMsgInfo(SMsgSendInfo* pMsgBody); -int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, +int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo, bool persistHandle, void* ctx); /** @@ -205,7 +205,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra * @param pInfo * @return */ -int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); +int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo); int32_t queryBuildUseDbOutput(SUseDbOutput* pOut, SUseDbRsp* usedbRsp); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 6b1476fe46..d8fda57791 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -148,11 +148,12 @@ void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { taosMemoryFreeClear(pMsgBody); } -int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, +int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo, bool persistHandle, void* rpcCtx) { char* pMsg = rpcMallocCont(pInfo->msgInfo.len); if (NULL == pMsg) { qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); + destroySendMsgInfo(pInfo); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return terrno; } @@ -167,13 +168,15 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra .info.persistHandle = persistHandle, .code = 0 }; - assert(pInfo->fp != NULL); TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId); - rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); - return TSDB_CODE_SUCCESS; + int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx); + if (code) { + destroySendMsgInfo(pInfo); + } + return code; } -int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { +int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) { return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 5452ca31a5..83ea510962 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -555,7 +555,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { *fp = schHandleCallback; break; case TDMT_SCH_DROP_TASK: - *fp = schHandleDropCallback; + //*fp = schHandleDropCallback; break; case TDMT_SCH_QUERY_HEARTBEAT: *fp = schHandleHbCallback; From db37fef8988d7914fe2ad93b6615c4f4b7c09043 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 21 Jul 2022 09:10:18 +0800 Subject: [PATCH 18/25] fix: fix crash issue --- source/libs/scheduler/src/schRemote.c | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 83ea510962..b794cb91f5 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -509,7 +509,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3 SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == msgSendInfo) { SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } msgSendInfo->paramFreeFp = taosMemoryFree; @@ -535,8 +535,12 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3 _return: - destroySendMsgInfo(msgSendInfo); + if (msgSendInfo) { + destroySendMsgInfo(msgSendInfo); + } + taosMemoryFree(msg); + SCH_RET(code); } @@ -555,7 +559,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { *fp = schHandleCallback; break; case TDMT_SCH_DROP_TASK: - //*fp = schHandleDropCallback; + *fp = schHandleDropCallback; break; case TDMT_SCH_QUERY_HEARTBEAT: *fp = schHandleHbCallback; @@ -843,6 +847,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery int64_t transporterId = 0; code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); + pMsgSendInfo = NULL; if (code) { SCH_ERR_JRET(code); } @@ -919,7 +924,9 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) { addr.epSet.numOfEps = 1; memcpy(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep)); - SCH_ERR_JRET(schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx)); + code = schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx); + msg = NULL; + SCH_ERR_JRET(code); return TSDB_CODE_SUCCESS; @@ -1087,9 +1094,10 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, } SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; - SCH_ERR_JRET( - schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL))); - + schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); + msg = NULL; + SCH_ERR_JRET(code); + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId)); } From dac39371cd1130469f54cdeb561156366aec15db Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 21 Jul 2022 10:27:39 +0800 Subject: [PATCH 19/25] fix: fix drop task memory leak --- source/client/src/clientEnv.c | 2 +- source/libs/scheduler/src/schTask.c | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 635d4bf2f9..5b96729503 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -88,7 +88,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) { static bool clientRpcRfp(int32_t code, tmsg_t msgType) { if (NEED_REDIRECT_ERROR(code)) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || - msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT) { + msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK) { return false; } return true; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 282e81bb5d..c40e56ab6f 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -102,14 +102,14 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { } int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) { - SSchNodeInfo nodeInfo = {.addr = *addr, .handle = NULL}; + SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)}; if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) { SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("task execNode added, execId:%d", execId); + SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle); return TSDB_CODE_SUCCESS; } @@ -752,12 +752,18 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { return; } + int32_t i = 0; SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL); while (nodeInfo) { - SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); - - schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK); + if (nodeInfo->handle) { + SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); + schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK); + SCH_TASK_DLOG("start to drop task's %dth execNode", i); + } else { + SCH_TASK_DLOG("no need to drop task %dth execNode", i); + } + ++i; nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo); } From 113dfdde0ae70ad88412c39a9064b59e938af436 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 21 Jul 2022 10:38:36 +0800 Subject: [PATCH 20/25] fix: increase query thread number --- source/common/src/tglobal.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index db8afba409..76ec5b2d22 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -51,15 +51,15 @@ int32_t tsNumOfShmThreads = 1; int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 1; -int32_t tsNumOfMnodeQueryThreads = 2; +int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; -int32_t tsNumOfVnodeQueryThreads = 2; +int32_t tsNumOfVnodeQueryThreads = 4; int32_t tsNumOfVnodeStreamThreads = 2; int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2; -int32_t tsNumOfQnodeQueryThreads = 2; +int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeFetchThreads = 4; int32_t tsNumOfSnodeSharedThreads = 2; int32_t tsNumOfSnodeUniqueThreads = 2; @@ -402,16 +402,16 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1; - tsNumOfMnodeQueryThreads = tsNumOfCores / 8; - tsNumOfMnodeQueryThreads = TRANGE(tsNumOfMnodeQueryThreads, 1, 4); + tsNumOfMnodeQueryThreads = tsNumOfCores * 2; + tsNumOfMnodeQueryThreads = TRANGE(tsNumOfMnodeQueryThreads, 4, 8); if (cfgAddInt32(pCfg, "numOfMnodeQueryThreads", tsNumOfMnodeQueryThreads, 1, 1024, 0) != 0) return -1; tsNumOfMnodeReadThreads = tsNumOfCores / 8; tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4); if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1; - tsNumOfVnodeQueryThreads = tsNumOfCores / 4; - tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2); + tsNumOfVnodeQueryThreads = tsNumOfCores * 2; + tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeStreamThreads = tsNumOfCores / 4; @@ -430,8 +430,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1); if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1; - tsNumOfQnodeQueryThreads = tsNumOfCores / 2; - tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 1); + tsNumOfQnodeQueryThreads = tsNumOfCores * 2; + tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; tsNumOfQnodeFetchThreads = tsNumOfCores / 2; From 57436e5ae26a7210bd1c80982e291d847c3f1fdc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 21 Jul 2022 10:42:00 +0800 Subject: [PATCH 21/25] avoid mem leak --- source/libs/transport/inc/transComm.h | 1 + source/libs/transport/src/transCli.c | 20 ++++++++++++++++---- source/libs/transport/src/transComm.c | 4 +++- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 9eb5135969..843798817d 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -226,6 +226,7 @@ typedef struct { int index; int nAsync; uv_async_t* asyncs; + int8_t stop; } SAsyncPool; SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 32d1e7140e..557ed548f4 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1020,6 +1020,7 @@ void cliSendQuit(SCliThrd* thrd) { SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); msg->type = Quit; transAsyncSend(thrd->asyncPool, &msg->q); + atomic_store_8(&thrd->asyncPool->stop, 1); } void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { @@ -1238,7 +1239,9 @@ int transReleaseCliHandle(void* handle) { cmsg->msg = tmsg; cmsg->type = Release; - transAsyncSend(pThrd->asyncPool, &cmsg->q); + if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) { + return -1; + } return 0; } @@ -1279,7 +1282,10 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0); + if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) { + destroyCmsg(cliMsg); + return -1; + } transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; } @@ -1323,7 +1329,10 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - transAsyncSend(pThrd->asyncPool, &(cliMsg->q)); + if (0 != transAsyncSend(pThrd->asyncPool, &cliMsg->q)) { + destroyCmsg(cliMsg); + return -1; + } tsem_wait(sem); tsem_destroy(sem); taosMemoryFree(sem); @@ -1358,7 +1367,10 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid); - transAsyncSend(thrd->asyncPool, &(cliMsg->q)); + if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) { + destroyCmsg(cliMsg); + return -1; + } } transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 7c76f69f0c..c89bbd408b 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -177,7 +177,6 @@ int transSetConnOption(uv_tcp_t* stream) { SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); - pool->index = 0; pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); @@ -207,6 +206,9 @@ void transDestroyAsyncPool(SAsyncPool* pool) { taosMemoryFree(pool); } int transAsyncSend(SAsyncPool* pool, queue* q) { + if (atomic_load_8(&pool->stop) == 1) { + return -1; + } int idx = pool->index; idx = idx % pool->nAsync; // no need mutex here From ed1c777b4eff5ae02b879f843fb2bf54b0a01a13 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 21 Jul 2022 11:09:29 +0800 Subject: [PATCH 22/25] fix: avoid rpc mem leak --- source/libs/transport/src/transCli.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 557ed548f4..07a698f883 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1284,6 +1284,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) { destroyCmsg(cliMsg); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return -1; } transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); @@ -1330,7 +1331,10 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); if (0 != transAsyncSend(pThrd->asyncPool, &cliMsg->q)) { + tsem_destroy(sem); + taosMemoryFree(sem); destroyCmsg(cliMsg); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return -1; } tsem_wait(sem); @@ -1369,6 +1373,7 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) { destroyCmsg(cliMsg); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return -1; } } From 3621aa55404d792a767a2026d0af1b6eea5b7a91 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 21 Jul 2022 14:06:38 +0800 Subject: [PATCH 23/25] fix: fix mac compile issue --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 24c7d5452a..11aac2114d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2162,7 +2162,7 @@ static SSDataBlock* sysTableScanUserSTables(SOperatorInfo* pOperator) { } pInfo->pRes->info.rows = 0; - pOperator->status == OP_EXEC_DONE; + pOperator->status = OP_EXEC_DONE; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; From de8bb6c25ca279075989f3e498120720da2dba88 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 21 Jul 2022 14:40:16 +0800 Subject: [PATCH 24/25] fix: avoid rpc mem leak --- source/libs/transport/inc/transComm.h | 1 + source/libs/transport/src/transCli.c | 10 ++++++++-- source/libs/transport/src/transComm.c | 8 ++++++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 843798817d..9dd1a745d3 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -232,6 +232,7 @@ typedef struct { SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); void transDestroyAsyncPool(SAsyncPool* pool); int transAsyncSend(SAsyncPool* pool, queue* mq); +bool transAsyncPoolIsEmpty(SAsyncPool* pool); #define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \ do { \ diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 07a698f883..f94a7f3c37 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -70,6 +70,8 @@ typedef struct SCliThrd { SCvtAddr cvtAddr; + SCliMsg* stopMsg; + bool quit; } SCliThrd; @@ -761,14 +763,17 @@ void cliConnCb(uv_connect_t* req, int status) { } static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { + if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) { + pThrd->stopMsg = pMsg; + return; + } + pThrd->stopMsg = NULL; pThrd->quit = true; tDebug("cli work thread %p start to quit", pThrd); destroyCmsg(pMsg); destroyConnPool(pThrd->pool); uv_timer_stop(&pThrd->timer); uv_walk(pThrd->loop, cliWalkCb, NULL); - - // uv_stop(pThrd->loop); } static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { int64_t refId = (int64_t)(pMsg->msg.info.handle); @@ -925,6 +930,7 @@ static void cliAsyncCb(uv_async_t* handle) { if (count >= 2) { tTrace("cli process batch size:%d", count); } + if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); } static void* cliWorkThread(void* arg) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index c89bbd408b..c3cba3118c 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -228,6 +228,14 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { } return uv_async_send(async); } +bool transAsyncPoolIsEmpty(SAsyncPool* pool) { + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + if (!QUEUE_IS_EMPTY(&item->qmsg)) return false; + } + return true; +} void transCtxInit(STransCtx* ctx) { // init transCtx From 6c683902ff111c9335cc9c45eaa736895a5ef612 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 21 Jul 2022 15:23:18 +0800 Subject: [PATCH 25/25] fix: fix show create table issue --- source/libs/parser/src/parUtil.c | 6 ++++-- tests/script/api/batchprepare.c | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index f98b195039..74d5f03dc1 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -866,13 +866,15 @@ STableCfg* tableCfgDup(STableCfg* pCfg) { memcpy(pNew, pCfg, sizeof(*pNew)); if (NULL != pNew->pComment) { - pNew->pComment = strdup(pNew->pComment); + pNew->pComment = taosMemoryCalloc(pNew->commentLen + 1, 1); + memcpy(pNew->pComment, pCfg->pComment, pNew->commentLen); } if (NULL != pNew->pFuncs) { pNew->pFuncs = taosArrayDup(pNew->pFuncs); } if (NULL != pNew->pTags) { - pNew->pTags = strdup(pNew->pTags); + pNew->pTags = taosMemoryCalloc(pNew->tagsLen + 1, 1); + memcpy(pNew->pTags, pCfg->pTags, pNew->tagsLen); } int32_t schemaSize = (pCfg->numOfColumns + pCfg->numOfTags) * sizeof(SSchema); diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index e1aa1991a4..ada2039460 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -250,7 +250,7 @@ CaseCtrl gCaseCtrl = { #endif -#if 0 +#if 1 CaseCtrl gCaseCtrl = { // default .precision = TIME_PRECISION_MILLI, .bindNullNum = 0, @@ -282,7 +282,7 @@ CaseCtrl gCaseCtrl = { // default }; #endif -#if 1 +#if 0 CaseCtrl gCaseCtrl = { // query case with specified col&oper .bindNullNum = 1, .printCreateTblSql = false,