From 85fc4dfc2b033739a727d26036a456bda4aabcdb Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 15 Nov 2024 10:16:15 +0800 Subject: [PATCH] fix: data sink memory issues --- include/libs/executor/dataSinkMgt.h | 2 +- include/libs/executor/executor.h | 1 + source/libs/executor/inc/dataSinkInt.h | 6 +- source/libs/executor/src/dataDeleter.c | 14 +- source/libs/executor/src/dataDispatcher.c | 15 +- source/libs/executor/src/dataInserter.c | 10 +- source/libs/executor/src/dataSinkMgt.c | 12 +- source/libs/executor/src/executor.c | 12 +- source/libs/executor/test/queryPlanTests.cpp | 2 +- source/libs/nodes/src/nodesCloneFuncs.c | 50 +++++++ source/libs/qworker/src/qworker.c | 1 + source/util/test/memPoolTest.cpp | 148 ++++++++++++++++--- 12 files changed, 230 insertions(+), 43 deletions(-) diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 3ca7b79a6b..9267a74bb7 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -87,7 +87,7 @@ typedef struct SOutputData { * @param pHandle output * @return error code */ -int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id); +int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle, void* pParam, const char* id); int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 18e03de9f4..aa481c9b3a 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -58,6 +58,7 @@ typedef struct { struct SStorageAPI api; void* pWorkerCb; + bool localExec; } SReadHandle; // in queue mode, data streams are seperated by msg diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index c18eddd034..7df3439fc1 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -52,10 +52,10 @@ typedef struct SDataSinkHandle { FGetSinkFlags fGetFlags; } SDataSinkHandle; -int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle); -int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, +int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle); +int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle, void* pParam); -int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, +int32_t createDataInserter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle, void* pParam); #ifdef __cplusplus diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 890940960e..4564a07967 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -41,6 +41,7 @@ typedef struct SDataCacheEntry { typedef struct SDataDeleterHandle { SDataSinkHandle sink; SDataSinkManager* pManager; + SDataSinkNode* pSinkNode; SDataBlockDescNode* pSchema; SDataDeleterNode* pDeleter; SDeleterParam* pParam; @@ -258,8 +259,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { } taosCloseQueue(pDeleter->pDataBlocks); (void)taosThreadMutexDestroy(&pDeleter->mutex); - nodesDestroyNode((SNode*)pDeleter->pSchema); - pDeleter->pSchema = NULL; + nodesDestroyNode((SNode*)pDeleter->pSinkNode); + pDeleter->pSinkNode = NULL; taosMemoryFree(pDeleter->pManager); @@ -282,8 +283,9 @@ static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) { } -int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, +int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle, void* pParam) { + SDataSinkNode* pDataSink = *ppDataSink; int32_t code = TSDB_CODE_SUCCESS; if (pParam == NULL) { code = TSDB_CODE_QRY_INVALID_INPUT; @@ -312,7 +314,8 @@ int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode* pDataSink, deleter->pManager = pManager; deleter->pDeleter = pDeleterNode; deleter->pSchema = pDataSink->pInputDataBlockDesc; - pDataSink->pInputDataBlockDesc = NULL; + deleter->pSinkNode = pDataSink; + *ppDataSink = NULL; deleter->pParam = pParam; deleter->status = DS_BUF_EMPTY; @@ -338,6 +341,9 @@ _end: } else { taosMemoryFree(pManager); } + + nodesDestroyNode((SNode *)*ppDataSink); + *ppDataSink = NULL; return code; } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 2e40f39b3d..e28658cd2a 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -43,6 +43,7 @@ typedef struct SDataDispatchHandle { SDataSinkHandle sink; SDataSinkManager* pManager; SDataBlockDescNode* pSchema; + SDataSinkNode* pSinkNode; STaosQueue* pDataBlocks; SDataDispatchBuf nextOutput; int32_t outPutColCounts; @@ -356,7 +357,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize); taosMemoryFreeClear(pDispatcher->nextOutput.pData); - nodesDestroyNode((SNode*)pDispatcher->pSchema); + nodesDestroyNode((SNode*)pDispatcher->pSinkNode); while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { SDataDispatchBuf* pBuf = NULL; @@ -437,12 +438,13 @@ int32_t getOutputColCounts(SDataBlockDescNode* pInputDataBlockDesc) { return numOfCols; } -int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle) { +int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle) { int32_t code; + SDataSinkNode* pDataSink = *ppDataSink; code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc); if (code) { qError("failed to check input data block desc, code:%d", code); - return code; + goto _return; } SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle)); @@ -461,7 +463,8 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode* pDataSin dispatcher->pManager = pManager; pManager = NULL; dispatcher->pSchema = pDataSink->pInputDataBlockDesc; - pDataSink->pInputDataBlockDesc = NULL; + dispatcher->pSinkNode = pDataSink; + *ppDataSink = NULL; dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema); dispatcher->status = DS_BUF_EMPTY; dispatcher->queryEnd = false; @@ -488,5 +491,9 @@ _return: if (dispatcher) { dsDestroyDataSinker(dispatcher); } + + nodesDestroyNode((SNode *)*ppDataSink); + *ppDataSink = NULL; + return terrno; } diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 5c98b825d8..ec041cba70 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -435,6 +435,9 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { taosMemoryFree(pInserter->pSchema); taosMemoryFree(pInserter->pParam); taosHashCleanup(pInserter->pCols); + nodesDestroyNode((SNode *)pInserter->pNode); + pInserter->pNode = NULL; + (void)taosThreadMutexDestroy(&pInserter->mutex); taosMemoryFree(pInserter->pManager); @@ -455,8 +458,9 @@ static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) { return TSDB_CODE_SUCCESS; } -int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, +int32_t createDataInserter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle, void* pParam) { + SDataSinkNode* pDataSink = *ppDataSink; SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle)); if (NULL == inserter) { taosMemoryFree(pParam); @@ -477,6 +481,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat inserter->status = DS_BUF_EMPTY; inserter->queryEnd = false; inserter->explain = pInserterNode->explain; + *ppDataSink = NULL; int64_t suid = 0; int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, @@ -530,5 +535,8 @@ _return: taosMemoryFree(pManager); } + nodesDestroyNode((SNode *)*ppDataSink); + *ppDataSink = NULL; + return terrno; } diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 06f5b5cce6..255dc901d6 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -39,23 +39,23 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) { return TSDB_CODE_SUCCESS; } -int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { +int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { SDataSinkManager* pManager = pSinkManager; - switch ((int)nodeType(pDataSink)) { + switch ((int)nodeType(*ppDataSink)) { case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: - return createDataDispatcher(pManager, pDataSink, pHandle); + return createDataDispatcher(pManager, ppDataSink, pHandle); case QUERY_NODE_PHYSICAL_PLAN_DELETE: { - return createDataDeleter(pManager, pDataSink, pHandle, pParam); + return createDataDeleter(pManager, ppDataSink, pHandle, pParam); } case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: { - return createDataInserter(pManager, pDataSink, pHandle, pParam); + return createDataInserter(pManager, ppDataSink, pHandle, pParam); } default: break; } taosMemoryFree(pSinkManager); - qError("invalid input node type:%d, %s", nodeType(pDataSink), id); + qError("invalid input node type:%d, %s", nodeType(*ppDataSink), id); return TSDB_CODE_QRY_INVALID_INPUT; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 8233195642..d64138f705 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -636,8 +636,18 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, goto _error; } + SDataSinkNode* pSink = NULL; + if (readHandle->localExec) { + code = nodesCloneNode((SNode *)pSubplan->pDataSink, (SNode **)&pSink); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to nodesCloneNode, srcType:%d, code:%s, %s", nodeType(pSubplan->pDataSink), tstrerror(code), (*pTask)->id.str); + taosMemoryFree(pSinkManager); + goto _error; + } + } + // pSinkParam has been freed during create sinker. - code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); + code = dsCreateDataSinker(pSinkManager, readHandle->localExec ? &pSink : &pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); if (code) { qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code)); } diff --git a/source/libs/executor/test/queryPlanTests.cpp b/source/libs/executor/test/queryPlanTests.cpp index 940a5f4d35..d168f76b10 100755 --- a/source/libs/executor/test/queryPlanTests.cpp +++ b/source/libs/executor/test/queryPlanTests.cpp @@ -3133,7 +3133,7 @@ void qptExecPlan(SReadHandle* pReadHandle, SNode* pNode, SExecTaskInfo* pTaskInf case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: case QUERY_NODE_PHYSICAL_PLAN_DELETE: { DataSinkHandle handle = NULL; - qptCtx.result.code = dsCreateDataSinker(NULL, (SDataSinkNode*)pNode, &handle, NULL, NULL); + qptCtx.result.code = dsCreateDataSinker(NULL, (SDataSinkNode**)&pNode, &handle, NULL, NULL); dsDestroyDataSinker(handle); break; } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 8c314a14b8..b3c5446563 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -831,6 +831,44 @@ static int32_t physiProjectCopy(const SProjectPhysiNode* pSrc, SProjectPhysiNode return TSDB_CODE_SUCCESS; } +static int32_t dataSinkNodeCopy(const SDataSinkNode* pSrc, SDataSinkNode* pDst) { + CLONE_NODE_FIELD_EX(pInputDataBlockDesc, SDataBlockDescNode*); + return TSDB_CODE_SUCCESS; +} + +static int32_t physiDispatchCopy(const SDataDispatcherNode* pSrc, SDataDispatcherNode* pDst) { + COPY_BASE_OBJECT_FIELD(sink, dataSinkNodeCopy); + return TSDB_CODE_SUCCESS; +} + +static int32_t physiInserterCopy(const SDataInserterNode* pSrc, SDataInserterNode* pDst) { + COPY_BASE_OBJECT_FIELD(sink, dataSinkNodeCopy); + return TSDB_CODE_SUCCESS; +} + +static int32_t physiQueryInserterCopy(const SQueryInserterNode* pSrc, SQueryInserterNode* pDst) { + COPY_BASE_OBJECT_FIELD(sink, dataSinkNodeCopy); + CLONE_NODE_LIST_FIELD(pCols); + COPY_SCALAR_FIELD(tableId); + COPY_SCALAR_FIELD(stableId); + COPY_SCALAR_FIELD(tableType); + COPY_CHAR_ARRAY_FIELD(tableName); + COPY_SCALAR_FIELD(vgId); + COPY_OBJECT_FIELD(epSet, sizeof(SEpSet)); + COPY_SCALAR_FIELD(explain); + return TSDB_CODE_SUCCESS; +} + +static int32_t physiDeleterCopy(const SDataDeleterNode* pSrc, SDataDeleterNode* pDst) { + COPY_BASE_OBJECT_FIELD(sink, dataSinkNodeCopy); + COPY_SCALAR_FIELD(tableId); + COPY_SCALAR_FIELD(tableType); + COPY_CHAR_ARRAY_FIELD(tableFName); + COPY_CHAR_ARRAY_FIELD(tsColName); + COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow)); + return TSDB_CODE_SUCCESS; +} + static int32_t dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) { COPY_SCALAR_FIELD(dataBlockId); CLONE_NODE_LIST_FIELD(pSlots); @@ -1086,6 +1124,18 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) { case QUERY_NODE_PHYSICAL_PLAN_PROJECT: code = physiProjectCopy((const SProjectPhysiNode*)pNode, (SProjectPhysiNode*)pDst); break; + case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: + code = physiDispatchCopy((const SDataDispatcherNode*)pNode, (SDataDispatcherNode*)pDst); + break; + //case QUERY_NODE_PHYSICAL_PLAN_INSERT: + // code = physiInserterCopy((const SDataInserterNode*)pNode, (SDataInserterNode*)pDst); + // break; + case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: + code = physiQueryInserterCopy((const SQueryInserterNode*)pNode, (SQueryInserterNode*)pDst); + break; + case QUERY_NODE_PHYSICAL_PLAN_DELETE: + code = physiDeleterCopy((const SDataDeleterNode*)pNode, (SDataDeleterNode*)pDst); + break; default: break; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index d6db77990d..2e5f6c766f 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1525,6 +1525,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64 } rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; + rHandle.localExec = true; code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH); if (code) { diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 5c275ffc5e..ffc20cff7a 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -54,10 +54,68 @@ namespace { #define MPT_MIN_MEM_POOL_SIZE (1048576UL) #define MPT_MAX_RETIRE_JOB_NUM 10000 +enum { + MPT_READ = 1, + MPT_WRITE, +}; + threadlocal void* mptThreadPoolHandle = NULL; threadlocal void* mptThreadPoolSession = NULL; +#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 + +#define MPT_LOCK(type, _lock) \ + do { \ + if (MPT_READ == (type)) { \ + if (atomic_load_32((_lock)) < 0) { \ + uError("invalid lock value before read lock"); \ + break; \ + } \ + taosRLockLatch(_lock); \ + if (atomic_load_32((_lock)) <= 0) { \ + uError("invalid lock value after read lock"); \ + break; \ + } \ + } else { \ + if (atomic_load_32((_lock)) < 0) { \ + uError("invalid lock value before write lock"); \ + break; \ + } \ + taosWLockLatch(_lock); \ + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ + uError("invalid lock value after write lock"); \ + break; \ + } \ + } \ + } while (0) + +#define MPT_UNLOCK(type, _lock) \ + do { \ + if (MPT_READ == (type)) { \ + if (atomic_load_32((_lock)) <= 0) { \ + uError("invalid lock value before read unlock"); \ + break; \ + } \ + taosRUnLockLatch(_lock); \ + if (atomic_load_32((_lock)) < 0) { \ + uError("invalid lock value after read unlock"); \ + break; \ + } \ + } else { \ + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ + uError("invalid lock value before write unlock"); \ + break; \ + } \ + taosWUnLockLatch(_lock); \ + if (atomic_load_32((_lock)) < 0) { \ + uError("invalid lock value after write unlock"); \ + break; \ + } \ + } \ + } while (0) + + #define MPT_SET_TEID(id, tId, eId) \ do { \ *(uint64_t *)(id) = (tId); \ @@ -103,8 +161,11 @@ typedef struct SMPTJobInfo { int8_t retired; int32_t errCode; SMemPoolJob* memInfo; - SHashObj* pSessions; void* pCtx; + + SRWLatch lock; + int8_t destroyed; + SHashObj* pSessions; } SMPTJobInfo; @@ -128,7 +189,7 @@ typedef struct { typedef struct { uint64_t taskId; SRWLatch taskExecLock; - bool taskFinished; + bool destoryed; int64_t poolMaxUsedSize; int64_t poolTotalUsedSize; @@ -290,6 +351,8 @@ void mptDestroyTaskCtx(SMPTestTaskCtx* pTask, void* pSession) { } taosMemFreeClear(pTask->pMemList); taosMemFreeClear(pTask->npMemList); + + pTask->destoryed = true; } @@ -383,6 +446,8 @@ void mptInitTask(int32_t idx, int32_t eId, SMPTestJobCtx* pJob) { pJob->taskCtxs[idx].npMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].npMemList)); ASSERT_TRUE(NULL != pJob->taskCtxs[idx].npMemList); + + pJob->taskCtxs[idx].destoryed = false; uDebug("JOB:0x%x TASK:0x%x idx:%d initialized", pJob->jobId, pJob->taskCtxs[idx].taskId, idx); } @@ -401,6 +466,37 @@ void mptInitJob(int32_t idx) { uDebug("JOB:0x%x idx:%d initialized, taskNum:%d", pJobCtx->jobId, idx, pJobCtx->taskNum); } +void mptDestroySession(uint64_t qId, int64_t tId, int32_t eId, int32_t taskIdx, SMPTestJobCtx* pJobCtx, void* session) { + SMPTJobInfo *pJobInfo = pJobCtx->pJob; + char id[sizeof(tId) + sizeof(eId) + 1] = {0}; + MPT_SET_TEID(id, tId, eId); + int32_t remainSessions = 0; + + (void)taosHashRemove(pJobInfo->pSessions, id, sizeof(id)); + + taosMemPoolDestroySession(gMemPoolHandle, session, &remainSessions); + + if (0 == remainSessions) { + MPT_LOCK(MPT_WRITE, &pJobInfo->lock); + if (0 == taosHashGetSize(pJobInfo->pSessions)) { + atomic_store_8(&pJobInfo->destroyed, 1); + + uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobInfo->errCode); + + mptDestroyJobInfo(pJobInfo); + MPT_UNLOCK(MPT_WRITE, &pJobInfo->lock); + + pJobCtx->pJob = NULL; + + (void)taosHashRemove(mptCtx.pJobs, &qId, sizeof(qId)); + uInfo("the whole query job removed"); + } else { + MPT_UNLOCK(MPT_WRITE, &pJobInfo->lock); + } + } +} + + int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { if (taosWTryLockLatch(&pJobCtx->jobExecLock)) { return -1; @@ -408,22 +504,25 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { uint64_t jobId = pJobCtx->jobId; for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { - SMPStatDetail* pStat = NULL; - int64_t allocSize = 0; - taosMemPoolGetSessionStat(pJobCtx->pSessions[i], &pStat, &allocSize, NULL); - int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat); + if (!pJobCtx->taskCtxs[i].destoryed) { + SMPStatDetail* pStat = NULL; + int64_t allocSize = 0; + taosMemPoolGetSessionStat(pJobCtx->pSessions[i], &pStat, &allocSize, NULL); + int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat); - assert(allocSize == usedSize); - assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[i].stat, sizeof(*pStat))); - - mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]); - taosMemPoolDestroySession(gMemPoolHandle, pJobCtx->pSessions[i], NULL); + assert(allocSize == usedSize); + assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[i].stat, sizeof(*pStat))); + + mptDestroySession(pJobCtx->jobId, pJobCtx->taskCtxs[i].taskId, 0, i, pJobCtx, pJobCtx->pSessions[i]); + pJobCtx->pSessions[i] = NULL; + + mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]); + } } - uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobCtx->pJob->errCode); - mptDestroyJobInfo(pJobCtx->pJob); - (void)taosHashRemove(mptCtx.pJobs, &pJobCtx->jobId, sizeof(pJobCtx->jobId)); + //mptDestroyJobInfo(pJobCtx->pJob); + //(void)taosHashRemove(mptCtx.pJobs, &pJobCtx->jobId, sizeof(pJobCtx->jobId)); if (reset) { int32_t jobIdx = pJobCtx->jobIdx; @@ -945,21 +1044,26 @@ void mptCheckPoolUsedSize(int32_t jobNum) { for (int32_t i = 0; i < jobNum; ++i) { SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; + if (NULL == pJobCtx->pJob) { + continue; + } while (taosRTryLockLatch(&pJobCtx->jobExecLock)) { } int64_t jobUsedSize = 0; for (int32_t m = 0; m < pJobCtx->taskNum; ++m) { - SMPStatDetail* pStat = NULL; - int64_t allocSize = 0; - taosMemPoolGetSessionStat(pJobCtx->pSessions[m], &pStat, &allocSize, NULL); - int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat); - - assert(allocSize == usedSize); - assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[m].stat, sizeof(*pStat))); + if (!pJobCtx->taskCtxs[m].destoryed) { + SMPStatDetail* pStat = NULL; + int64_t allocSize = 0; + taosMemPoolGetSessionStat(pJobCtx->pSessions[m], &pStat, &allocSize, NULL); + int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat); + + assert(allocSize == usedSize); + assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[m].stat, sizeof(*pStat))); - jobUsedSize += allocSize; + jobUsedSize += allocSize; + } } assert(pJobCtx->pJob->memInfo->allocMemSize == jobUsedSize);