From 84e675c24862bd44c20aee73d742271bb9b0e720 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Wed, 11 Sep 2024 17:39:24 +0800 Subject: [PATCH 1/3] fix: memleak in taos (create table) --- source/client/src/clientImpl.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index d77b8dcbb7..ac5866401e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1163,6 +1163,9 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows); } } + if (TSDB_CODE_SUCCESS == code) { + schedulerFreeJob(&pRequest->body.queryJob, 0); + } } taosMemoryFree(pResult); From 8af5e89d4e0a77414e15b6afb774582d3157e638 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Wed, 11 Sep 2024 17:46:53 +0800 Subject: [PATCH 2/3] enh: create from csv rm tbname hash --- source/libs/parser/src/parTranslater.c | 33 ++++---------------------- 1 file changed, 5 insertions(+), 28 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 854bf83a1f..81adfff9a1 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5993,7 +5993,7 @@ static int32_t isOperatorEqTbnameCond(STranslateContext* pCxt, SOperatorNode* pO *pRet = false; return TSDB_CODE_SUCCESS; } - + SFunctionNode* pTbnameFunc = NULL; SValueNode* pValueNode = NULL; if (nodeType(pOperator->pLeft) == QUERY_NODE_FUNCTION && @@ -6275,7 +6275,7 @@ static int32_t replaceToChildTableQuery(STranslateContext* pCxt, SEqCondTbNameTa if (NULL == pMeta || TSDB_CHILD_TABLE != pMeta->tableType || pMeta->suid != pRealTable->pMeta->suid) { goto _return; } - + pRealTable->pMeta->uid = pMeta->uid; pRealTable->pMeta->vgId = pMeta->vgId; pRealTable->pMeta->tableType = pMeta->tableType; @@ -6386,11 +6386,11 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt* } qDebug("before ctbname optimize, code:%d, aTableNum:%d, nTbls:%d, stableQuery:%d", code, aTableNum, nTbls, stableQuery); - + if (TSDB_CODE_SUCCESS == code && 1 == aTableNum && 1 == nTbls && stableQuery && NULL == pInfo->pRealTable->pTsmas) { code = replaceToChildTableQuery(pCxt, pInfo); } - + return code; } @@ -6797,7 +6797,7 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) { if (pCxt->pParseCxt && pCxt->pParseCxt->setQueryFp) { (*pCxt->pParseCxt->setQueryFp)(pCxt->pParseCxt->requestRid); } - + if (NULL == pSelect->pFromTable) { return translateSelectWithoutFrom(pCxt, pSelect); } else { @@ -13801,7 +13801,6 @@ _OUT: } typedef struct SParseFileContext { - SHashObj* pTbNameHash; SArray* aTagNames; bool tagNameFilled; STableMeta* pStbMeta; @@ -13936,18 +13935,6 @@ static int32_t parseCsvFile(SMsgBuf* pMsgBuf, SParseContext* pParseCxt, SParseFi code = parseOneStbRow(pMsgBuf, pParFileCxt); - if (TSDB_CODE_SUCCESS == code) { - if (taosHashGet(pParFileCxt->pTbNameHash, pParFileCxt->ctbName.tname, strlen(pParFileCxt->ctbName.tname) + 1) != - NULL) { - taosMemoryFreeClear(pParFileCxt->pTag); - code = generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_TBNAME_DUPLICATED, pParFileCxt->ctbName.tname); - break; - } - - code = taosHashPut(pParFileCxt->pTbNameHash, pParFileCxt->ctbName.tname, strlen(pParFileCxt->ctbName.tname) + 1, - NULL, 0); - } - if (TSDB_CODE_SUCCESS == code) { code = fillVgroupInfo(pParseCxt, &pParFileCxt->ctbName, &pParFileCxt->vg); } @@ -13987,7 +13974,6 @@ static void destructParseFileContext(SParseFileContext** ppParFileCxt) { SParseFileContext* pParFileCxt = *ppParFileCxt; - taosHashCleanup(pParFileCxt->pTbNameHash); taosArrayDestroy(pParFileCxt->aTagNames); taosMemoryFreeClear(pParFileCxt->pStbMeta); taosArrayDestroy(pParFileCxt->aTagIndexs); @@ -14012,15 +13998,6 @@ static int32_t constructParseFileContext(SCreateSubTableFromFileClause* pStmt, S pParFileCxt->ctbName.acctId = acctId; strcpy(pParFileCxt->ctbName.dbname, pStmt->useDbName); - if (NULL == pParFileCxt->pTbNameHash) { - pParFileCxt->pTbNameHash = - taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK); - if (!pParFileCxt->pTbNameHash) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _ERR; - } - } - if (NULL == pParFileCxt->aTagNames) { pParFileCxt->aTagNames = taosArrayInit(8, TSDB_COL_NAME_LEN); if (NULL == pParFileCxt->aTagNames) { From 4ef4dddc7bb7f7ed2c85f590af729897f676ad66 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Fri, 13 Sep 2024 18:59:54 +0800 Subject: [PATCH 3/3] fix: schedulerFreeJob reset jobId only on the last reference --- include/util/tref.h | 1 + source/client/src/clientImpl.c | 4 +--- source/libs/scheduler/inc/schInt.h | 1 + source/libs/scheduler/src/schUtil.c | 13 +++++++++++-- source/libs/scheduler/src/scheduler.c | 9 ++++++--- source/util/src/tref.c | 13 +++++++++---- 6 files changed, 29 insertions(+), 12 deletions(-) diff --git a/include/util/tref.h b/include/util/tref.h index 1520ced14e..f17fb73f3d 100644 --- a/include/util/tref.h +++ b/include/util/tref.h @@ -46,6 +46,7 @@ void *taosAcquireRef(int32_t rsetId, int64_t rid); // release ref, rid is the reference ID returned by taosAddRef // return 0 if success. On error, -1 is returned, and terrno is set appropriately int32_t taosReleaseRef(int32_t rsetId, int64_t rid); +int32_t taosReleaseRefEx(int32_t rsetId, int64_t rid, int32_t* isReleased); // return the first reference if rid is 0, otherwise return the next after current reference. // if return value is NULL, it means list is over(if terrno is set, it means error happens) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index ac5866401e..5afd8a0cbe 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1163,9 +1163,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows); } } - if (TSDB_CODE_SUCCESS == code) { - schedulerFreeJob(&pRequest->body.queryJob, 0); - } + schedulerFreeJob(&pRequest->body.queryJob, 0); } taosMemoryFree(pResult); diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 3a25f37895..8d77ba1c98 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -570,6 +570,7 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask); int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param); int32_t schAcquireJob(int64_t refId, SSchJob **ppJob); int32_t schReleaseJob(int64_t refId); +int32_t schReleaseJobEx(int64_t refId, int32_t* released); void schFreeFlowCtrl(SSchJob *pJob); int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel); int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask); diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 3f610ed387..bcca820dff 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -41,6 +41,15 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); } +FORCE_INLINE int32_t schReleaseJobEx(int64_t refId, int32_t* released) { + if (0 == refId) { + return TSDB_CODE_SUCCESS; + } + + qDebug("sch release ex jobId:0x%" PRIx64, refId); + return taosReleaseRefEx(schMgmt.jobRef, refId, released); +} + int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes) { *ppRes = NULL; if (NULL == pEpSet) { @@ -189,7 +198,7 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); return; } - + SQueryNodeEpId epId = {0}; epId.nodeId = addr->nodeId; @@ -334,7 +343,7 @@ void schFreeRpcCtx(SRpcCtx *pCtx) { void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) { *pTask = NULL; - + int32_t s = taosHashGetSize(pTaskList); if (s <= 0) { return; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3ce5cd5714..7a69691573 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -119,7 +119,7 @@ int32_t schedulerGetTasksStatus(int64_t jobId, SArray *pSub) { qError("failed to get task %d, total: %d", m, pLevel->taskNum); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } - + SQuerySubDesc subDesc = {0}; subDesc.tid = pTask->taskId; TAOS_STRCPY(subDesc.status, jobTaskStatusStr(pTask->status)); @@ -179,8 +179,11 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) { SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", code:%s", *jobId, tstrerror(errCode)); (void)schHandleJobDrop(pJob, errCode); // ignore any error - (void)schReleaseJob(*jobId); // ignore error - *jobId = 0; + int32_t released = false; + (void)schReleaseJobEx(*jobId, &released); // ignore error + if (released) { + *jobId = 0; + } } void schedulerDestroy(void) { diff --git a/source/util/src/tref.c b/source/util/src/tref.c index 0eac7b4427..d387156e2c 100644 --- a/source/util/src/tref.c +++ b/source/util/src/tref.c @@ -55,7 +55,7 @@ static void taosLockList(int64_t *lockedBy); static void taosUnlockList(int64_t *lockedBy); static void taosIncRsetCount(SRefSet *pSet); static void taosDecRsetCount(SRefSet *pSet); -static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove); +static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove, int32_t* isReleased); int32_t taosOpenRef(int32_t max, RefFp fp) { SRefNode **nodeList; @@ -181,7 +181,7 @@ int64_t taosAddRef(int32_t rsetId, void *p) { return rid; } -int32_t taosRemoveRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 1); } +int32_t taosRemoveRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 1, NULL); } // if rid is 0, return the first p in hash list, otherwise, return the next after current rid void *taosAcquireRef(int32_t rsetId, int64_t rid) { @@ -245,7 +245,8 @@ void *taosAcquireRef(int32_t rsetId, int64_t rid) { return p; } -int32_t taosReleaseRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 0); } +int32_t taosReleaseRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 0, NULL); } +int32_t taosReleaseRefEx(int32_t rsetId, int64_t rid, int32_t* isReleased) { return taosDecRefCount(rsetId, rid, 0, isReleased); } // if rid is 0, return the first p in hash list, otherwise, return the next after current rid void *taosIterateRef(int32_t rsetId, int64_t rid) { @@ -372,7 +373,7 @@ int32_t taosListRef() { return num; } -static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) { +static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove, int32_t* isReleased) { int32_t hash; SRefSet *pSet; SRefNode *pNode; @@ -440,6 +441,10 @@ static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) { taosDecRsetCount(pSet); } + if (isReleased) { + *isReleased = released; + } + return code; }