fix: data sink memory issues

This commit is contained in:
dapan1121 2024-11-15 10:16:15 +08:00
parent 40f9e366e9
commit 85fc4dfc2b
12 changed files with 230 additions and 43 deletions

View File

@ -87,7 +87,7 @@ typedef struct SOutputData {
* @param pHandle output * @param pHandle output
* @return error code * @return error code
*/ */
int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id); int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle, void* pParam, const char* id);
int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat); int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat);

View File

@ -58,6 +58,7 @@ typedef struct {
struct SStorageAPI api; struct SStorageAPI api;
void* pWorkerCb; void* pWorkerCb;
bool localExec;
} SReadHandle; } SReadHandle;
// in queue mode, data streams are seperated by msg // in queue mode, data streams are seperated by msg

View File

@ -52,10 +52,10 @@ typedef struct SDataSinkHandle {
FGetSinkFlags fGetFlags; FGetSinkFlags fGetFlags;
} SDataSinkHandle; } SDataSinkHandle;
int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle); int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle);
int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle,
void* pParam); void* pParam);
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, int32_t createDataInserter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle,
void* pParam); void* pParam);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -41,6 +41,7 @@ typedef struct SDataCacheEntry {
typedef struct SDataDeleterHandle { typedef struct SDataDeleterHandle {
SDataSinkHandle sink; SDataSinkHandle sink;
SDataSinkManager* pManager; SDataSinkManager* pManager;
SDataSinkNode* pSinkNode;
SDataBlockDescNode* pSchema; SDataBlockDescNode* pSchema;
SDataDeleterNode* pDeleter; SDataDeleterNode* pDeleter;
SDeleterParam* pParam; SDeleterParam* pParam;
@ -258,8 +259,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
} }
taosCloseQueue(pDeleter->pDataBlocks); taosCloseQueue(pDeleter->pDataBlocks);
(void)taosThreadMutexDestroy(&pDeleter->mutex); (void)taosThreadMutexDestroy(&pDeleter->mutex);
nodesDestroyNode((SNode*)pDeleter->pSchema); nodesDestroyNode((SNode*)pDeleter->pSinkNode);
pDeleter->pSchema = NULL; pDeleter->pSinkNode = NULL;
taosMemoryFree(pDeleter->pManager); taosMemoryFree(pDeleter->pManager);
@ -282,8 +283,9 @@ static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
} }
int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle,
void* pParam) { void* pParam) {
SDataSinkNode* pDataSink = *ppDataSink;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pParam == NULL) { if (pParam == NULL) {
code = TSDB_CODE_QRY_INVALID_INPUT; code = TSDB_CODE_QRY_INVALID_INPUT;
@ -312,7 +314,8 @@ int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode* pDataSink,
deleter->pManager = pManager; deleter->pManager = pManager;
deleter->pDeleter = pDeleterNode; deleter->pDeleter = pDeleterNode;
deleter->pSchema = pDataSink->pInputDataBlockDesc; deleter->pSchema = pDataSink->pInputDataBlockDesc;
pDataSink->pInputDataBlockDesc = NULL; deleter->pSinkNode = pDataSink;
*ppDataSink = NULL;
deleter->pParam = pParam; deleter->pParam = pParam;
deleter->status = DS_BUF_EMPTY; deleter->status = DS_BUF_EMPTY;
@ -339,5 +342,8 @@ _end:
taosMemoryFree(pManager); taosMemoryFree(pManager);
} }
nodesDestroyNode((SNode *)*ppDataSink);
*ppDataSink = NULL;
return code; return code;
} }

View File

@ -43,6 +43,7 @@ typedef struct SDataDispatchHandle {
SDataSinkHandle sink; SDataSinkHandle sink;
SDataSinkManager* pManager; SDataSinkManager* pManager;
SDataBlockDescNode* pSchema; SDataBlockDescNode* pSchema;
SDataSinkNode* pSinkNode;
STaosQueue* pDataBlocks; STaosQueue* pDataBlocks;
SDataDispatchBuf nextOutput; SDataDispatchBuf nextOutput;
int32_t outPutColCounts; int32_t outPutColCounts;
@ -356,7 +357,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
(void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize); (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDispatcher->cachedSize);
taosMemoryFreeClear(pDispatcher->nextOutput.pData); taosMemoryFreeClear(pDispatcher->nextOutput.pData);
nodesDestroyNode((SNode*)pDispatcher->pSchema); nodesDestroyNode((SNode*)pDispatcher->pSinkNode);
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
SDataDispatchBuf* pBuf = NULL; SDataDispatchBuf* pBuf = NULL;
@ -437,12 +438,13 @@ int32_t getOutputColCounts(SDataBlockDescNode* pInputDataBlockDesc) {
return numOfCols; return numOfCols;
} }
int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle) { int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle) {
int32_t code; int32_t code;
SDataSinkNode* pDataSink = *ppDataSink;
code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc); code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc);
if (code) { if (code) {
qError("failed to check input data block desc, code:%d", code); qError("failed to check input data block desc, code:%d", code);
return code; goto _return;
} }
SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle)); SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
@ -461,7 +463,8 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode* pDataSin
dispatcher->pManager = pManager; dispatcher->pManager = pManager;
pManager = NULL; pManager = NULL;
dispatcher->pSchema = pDataSink->pInputDataBlockDesc; dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
pDataSink->pInputDataBlockDesc = NULL; dispatcher->pSinkNode = pDataSink;
*ppDataSink = NULL;
dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema); dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema);
dispatcher->status = DS_BUF_EMPTY; dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false; dispatcher->queryEnd = false;
@ -488,5 +491,9 @@ _return:
if (dispatcher) { if (dispatcher) {
dsDestroyDataSinker(dispatcher); dsDestroyDataSinker(dispatcher);
} }
nodesDestroyNode((SNode *)*ppDataSink);
*ppDataSink = NULL;
return terrno; return terrno;
} }

View File

@ -435,6 +435,9 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
taosMemoryFree(pInserter->pSchema); taosMemoryFree(pInserter->pSchema);
taosMemoryFree(pInserter->pParam); taosMemoryFree(pInserter->pParam);
taosHashCleanup(pInserter->pCols); taosHashCleanup(pInserter->pCols);
nodesDestroyNode((SNode *)pInserter->pNode);
pInserter->pNode = NULL;
(void)taosThreadMutexDestroy(&pInserter->mutex); (void)taosThreadMutexDestroy(&pInserter->mutex);
taosMemoryFree(pInserter->pManager); taosMemoryFree(pInserter->pManager);
@ -455,8 +458,9 @@ static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, int32_t createDataInserter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle,
void* pParam) { void* pParam) {
SDataSinkNode* pDataSink = *ppDataSink;
SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle)); SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
if (NULL == inserter) { if (NULL == inserter) {
taosMemoryFree(pParam); taosMemoryFree(pParam);
@ -477,6 +481,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
inserter->status = DS_BUF_EMPTY; inserter->status = DS_BUF_EMPTY;
inserter->queryEnd = false; inserter->queryEnd = false;
inserter->explain = pInserterNode->explain; inserter->explain = pInserterNode->explain;
*ppDataSink = NULL;
int64_t suid = 0; int64_t suid = 0;
int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId,
@ -530,5 +535,8 @@ _return:
taosMemoryFree(pManager); taosMemoryFree(pManager);
} }
nodesDestroyNode((SNode *)*ppDataSink);
*ppDataSink = NULL;
return terrno; return terrno;
} }

View File

@ -39,23 +39,23 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
SDataSinkManager* pManager = pSinkManager; SDataSinkManager* pManager = pSinkManager;
switch ((int)nodeType(pDataSink)) { switch ((int)nodeType(*ppDataSink)) {
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return createDataDispatcher(pManager, pDataSink, pHandle); return createDataDispatcher(pManager, ppDataSink, pHandle);
case QUERY_NODE_PHYSICAL_PLAN_DELETE: { case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
return createDataDeleter(pManager, pDataSink, pHandle, pParam); return createDataDeleter(pManager, ppDataSink, pHandle, pParam);
} }
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: { case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
return createDataInserter(pManager, pDataSink, pHandle, pParam); return createDataInserter(pManager, ppDataSink, pHandle, pParam);
} }
default: default:
break; break;
} }
taosMemoryFree(pSinkManager); taosMemoryFree(pSinkManager);
qError("invalid input node type:%d, %s", nodeType(pDataSink), id); qError("invalid input node type:%d, %s", nodeType(*ppDataSink), id);
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }

View File

@ -636,8 +636,18 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
goto _error; goto _error;
} }
SDataSinkNode* pSink = NULL;
if (readHandle->localExec) {
code = nodesCloneNode((SNode *)pSubplan->pDataSink, (SNode **)&pSink);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to nodesCloneNode, srcType:%d, code:%s, %s", nodeType(pSubplan->pDataSink), tstrerror(code), (*pTask)->id.str);
taosMemoryFree(pSinkManager);
goto _error;
}
}
// pSinkParam has been freed during create sinker. // pSinkParam has been freed during create sinker.
code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); code = dsCreateDataSinker(pSinkManager, readHandle->localExec ? &pSink : &pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
if (code) { if (code) {
qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code)); qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code));
} }

View File

@ -3133,7 +3133,7 @@ void qptExecPlan(SReadHandle* pReadHandle, SNode* pNode, SExecTaskInfo* pTaskInf
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
case QUERY_NODE_PHYSICAL_PLAN_DELETE: { case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
DataSinkHandle handle = NULL; DataSinkHandle handle = NULL;
qptCtx.result.code = dsCreateDataSinker(NULL, (SDataSinkNode*)pNode, &handle, NULL, NULL); qptCtx.result.code = dsCreateDataSinker(NULL, (SDataSinkNode**)&pNode, &handle, NULL, NULL);
dsDestroyDataSinker(handle); dsDestroyDataSinker(handle);
break; break;
} }

View File

@ -831,6 +831,44 @@ static int32_t physiProjectCopy(const SProjectPhysiNode* pSrc, SProjectPhysiNode
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t dataSinkNodeCopy(const SDataSinkNode* pSrc, SDataSinkNode* pDst) {
CLONE_NODE_FIELD_EX(pInputDataBlockDesc, SDataBlockDescNode*);
return TSDB_CODE_SUCCESS;
}
static int32_t physiDispatchCopy(const SDataDispatcherNode* pSrc, SDataDispatcherNode* pDst) {
COPY_BASE_OBJECT_FIELD(sink, dataSinkNodeCopy);
return TSDB_CODE_SUCCESS;
}
static int32_t physiInserterCopy(const SDataInserterNode* pSrc, SDataInserterNode* pDst) {
COPY_BASE_OBJECT_FIELD(sink, dataSinkNodeCopy);
return TSDB_CODE_SUCCESS;
}
static int32_t physiQueryInserterCopy(const SQueryInserterNode* pSrc, SQueryInserterNode* pDst) {
COPY_BASE_OBJECT_FIELD(sink, dataSinkNodeCopy);
CLONE_NODE_LIST_FIELD(pCols);
COPY_SCALAR_FIELD(tableId);
COPY_SCALAR_FIELD(stableId);
COPY_SCALAR_FIELD(tableType);
COPY_CHAR_ARRAY_FIELD(tableName);
COPY_SCALAR_FIELD(vgId);
COPY_OBJECT_FIELD(epSet, sizeof(SEpSet));
COPY_SCALAR_FIELD(explain);
return TSDB_CODE_SUCCESS;
}
static int32_t physiDeleterCopy(const SDataDeleterNode* pSrc, SDataDeleterNode* pDst) {
COPY_BASE_OBJECT_FIELD(sink, dataSinkNodeCopy);
COPY_SCALAR_FIELD(tableId);
COPY_SCALAR_FIELD(tableType);
COPY_CHAR_ARRAY_FIELD(tableFName);
COPY_CHAR_ARRAY_FIELD(tsColName);
COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow));
return TSDB_CODE_SUCCESS;
}
static int32_t dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) { static int32_t dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) {
COPY_SCALAR_FIELD(dataBlockId); COPY_SCALAR_FIELD(dataBlockId);
CLONE_NODE_LIST_FIELD(pSlots); CLONE_NODE_LIST_FIELD(pSlots);
@ -1086,6 +1124,18 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) {
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
code = physiProjectCopy((const SProjectPhysiNode*)pNode, (SProjectPhysiNode*)pDst); code = physiProjectCopy((const SProjectPhysiNode*)pNode, (SProjectPhysiNode*)pDst);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
code = physiDispatchCopy((const SDataDispatcherNode*)pNode, (SDataDispatcherNode*)pDst);
break;
//case QUERY_NODE_PHYSICAL_PLAN_INSERT:
// code = physiInserterCopy((const SDataInserterNode*)pNode, (SDataInserterNode*)pDst);
// break;
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
code = physiQueryInserterCopy((const SQueryInserterNode*)pNode, (SQueryInserterNode*)pDst);
break;
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
code = physiDeleterCopy((const SDataDeleterNode*)pNode, (SDataDeleterNode*)pDst);
break;
default: default:
break; break;
} }

View File

@ -1525,6 +1525,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64
} }
rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle;
rHandle.localExec = true;
code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH);
if (code) { if (code) {

View File

@ -54,10 +54,68 @@ namespace {
#define MPT_MIN_MEM_POOL_SIZE (1048576UL) #define MPT_MIN_MEM_POOL_SIZE (1048576UL)
#define MPT_MAX_RETIRE_JOB_NUM 10000 #define MPT_MAX_RETIRE_JOB_NUM 10000
enum {
MPT_READ = 1,
MPT_WRITE,
};
threadlocal void* mptThreadPoolHandle = NULL; threadlocal void* mptThreadPoolHandle = NULL;
threadlocal void* mptThreadPoolSession = NULL; threadlocal void* mptThreadPoolSession = NULL;
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define MPT_LOCK(type, _lock) \
do { \
if (MPT_READ == (type)) { \
if (atomic_load_32((_lock)) < 0) { \
uError("invalid lock value before read lock"); \
break; \
} \
taosRLockLatch(_lock); \
if (atomic_load_32((_lock)) <= 0) { \
uError("invalid lock value after read lock"); \
break; \
} \
} else { \
if (atomic_load_32((_lock)) < 0) { \
uError("invalid lock value before write lock"); \
break; \
} \
taosWLockLatch(_lock); \
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
uError("invalid lock value after write lock"); \
break; \
} \
} \
} while (0)
#define MPT_UNLOCK(type, _lock) \
do { \
if (MPT_READ == (type)) { \
if (atomic_load_32((_lock)) <= 0) { \
uError("invalid lock value before read unlock"); \
break; \
} \
taosRUnLockLatch(_lock); \
if (atomic_load_32((_lock)) < 0) { \
uError("invalid lock value after read unlock"); \
break; \
} \
} else { \
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
uError("invalid lock value before write unlock"); \
break; \
} \
taosWUnLockLatch(_lock); \
if (atomic_load_32((_lock)) < 0) { \
uError("invalid lock value after write unlock"); \
break; \
} \
} \
} while (0)
#define MPT_SET_TEID(id, tId, eId) \ #define MPT_SET_TEID(id, tId, eId) \
do { \ do { \
*(uint64_t *)(id) = (tId); \ *(uint64_t *)(id) = (tId); \
@ -103,8 +161,11 @@ typedef struct SMPTJobInfo {
int8_t retired; int8_t retired;
int32_t errCode; int32_t errCode;
SMemPoolJob* memInfo; SMemPoolJob* memInfo;
SHashObj* pSessions;
void* pCtx; void* pCtx;
SRWLatch lock;
int8_t destroyed;
SHashObj* pSessions;
} SMPTJobInfo; } SMPTJobInfo;
@ -128,7 +189,7 @@ typedef struct {
typedef struct { typedef struct {
uint64_t taskId; uint64_t taskId;
SRWLatch taskExecLock; SRWLatch taskExecLock;
bool taskFinished; bool destoryed;
int64_t poolMaxUsedSize; int64_t poolMaxUsedSize;
int64_t poolTotalUsedSize; int64_t poolTotalUsedSize;
@ -290,6 +351,8 @@ void mptDestroyTaskCtx(SMPTestTaskCtx* pTask, void* pSession) {
} }
taosMemFreeClear(pTask->pMemList); taosMemFreeClear(pTask->pMemList);
taosMemFreeClear(pTask->npMemList); taosMemFreeClear(pTask->npMemList);
pTask->destoryed = true;
} }
@ -384,6 +447,8 @@ void mptInitTask(int32_t idx, int32_t eId, SMPTestJobCtx* pJob) {
pJob->taskCtxs[idx].npMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].npMemList)); pJob->taskCtxs[idx].npMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].npMemList));
ASSERT_TRUE(NULL != pJob->taskCtxs[idx].npMemList); ASSERT_TRUE(NULL != pJob->taskCtxs[idx].npMemList);
pJob->taskCtxs[idx].destoryed = false;
uDebug("JOB:0x%x TASK:0x%x idx:%d initialized", pJob->jobId, pJob->taskCtxs[idx].taskId, idx); uDebug("JOB:0x%x TASK:0x%x idx:%d initialized", pJob->jobId, pJob->taskCtxs[idx].taskId, idx);
} }
@ -401,6 +466,37 @@ void mptInitJob(int32_t idx) {
uDebug("JOB:0x%x idx:%d initialized, taskNum:%d", pJobCtx->jobId, idx, pJobCtx->taskNum); uDebug("JOB:0x%x idx:%d initialized, taskNum:%d", pJobCtx->jobId, idx, pJobCtx->taskNum);
} }
void mptDestroySession(uint64_t qId, int64_t tId, int32_t eId, int32_t taskIdx, SMPTestJobCtx* pJobCtx, void* session) {
SMPTJobInfo *pJobInfo = pJobCtx->pJob;
char id[sizeof(tId) + sizeof(eId) + 1] = {0};
MPT_SET_TEID(id, tId, eId);
int32_t remainSessions = 0;
(void)taosHashRemove(pJobInfo->pSessions, id, sizeof(id));
taosMemPoolDestroySession(gMemPoolHandle, session, &remainSessions);
if (0 == remainSessions) {
MPT_LOCK(MPT_WRITE, &pJobInfo->lock);
if (0 == taosHashGetSize(pJobInfo->pSessions)) {
atomic_store_8(&pJobInfo->destroyed, 1);
uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobInfo->errCode);
mptDestroyJobInfo(pJobInfo);
MPT_UNLOCK(MPT_WRITE, &pJobInfo->lock);
pJobCtx->pJob = NULL;
(void)taosHashRemove(mptCtx.pJobs, &qId, sizeof(qId));
uInfo("the whole query job removed");
} else {
MPT_UNLOCK(MPT_WRITE, &pJobInfo->lock);
}
}
}
int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
if (taosWTryLockLatch(&pJobCtx->jobExecLock)) { if (taosWTryLockLatch(&pJobCtx->jobExecLock)) {
return -1; return -1;
@ -408,22 +504,25 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
uint64_t jobId = pJobCtx->jobId; uint64_t jobId = pJobCtx->jobId;
for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { for (int32_t i = 0; i < pJobCtx->taskNum; ++i) {
SMPStatDetail* pStat = NULL; if (!pJobCtx->taskCtxs[i].destoryed) {
int64_t allocSize = 0; SMPStatDetail* pStat = NULL;
taosMemPoolGetSessionStat(pJobCtx->pSessions[i], &pStat, &allocSize, NULL); int64_t allocSize = 0;
int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat); taosMemPoolGetSessionStat(pJobCtx->pSessions[i], &pStat, &allocSize, NULL);
int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat);
assert(allocSize == usedSize); assert(allocSize == usedSize);
assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[i].stat, sizeof(*pStat))); assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[i].stat, sizeof(*pStat)));
mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]); mptDestroySession(pJobCtx->jobId, pJobCtx->taskCtxs[i].taskId, 0, i, pJobCtx, pJobCtx->pSessions[i]);
taosMemPoolDestroySession(gMemPoolHandle, pJobCtx->pSessions[i], NULL); pJobCtx->pSessions[i] = NULL;
mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]);
}
} }
uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobCtx->pJob->errCode);
mptDestroyJobInfo(pJobCtx->pJob); //mptDestroyJobInfo(pJobCtx->pJob);
(void)taosHashRemove(mptCtx.pJobs, &pJobCtx->jobId, sizeof(pJobCtx->jobId)); //(void)taosHashRemove(mptCtx.pJobs, &pJobCtx->jobId, sizeof(pJobCtx->jobId));
if (reset) { if (reset) {
int32_t jobIdx = pJobCtx->jobIdx; int32_t jobIdx = pJobCtx->jobIdx;
@ -945,21 +1044,26 @@ void mptCheckPoolUsedSize(int32_t jobNum) {
for (int32_t i = 0; i < jobNum; ++i) { for (int32_t i = 0; i < jobNum; ++i) {
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
if (NULL == pJobCtx->pJob) {
continue;
}
while (taosRTryLockLatch(&pJobCtx->jobExecLock)) { while (taosRTryLockLatch(&pJobCtx->jobExecLock)) {
} }
int64_t jobUsedSize = 0; int64_t jobUsedSize = 0;
for (int32_t m = 0; m < pJobCtx->taskNum; ++m) { for (int32_t m = 0; m < pJobCtx->taskNum; ++m) {
SMPStatDetail* pStat = NULL; if (!pJobCtx->taskCtxs[m].destoryed) {
int64_t allocSize = 0; SMPStatDetail* pStat = NULL;
taosMemPoolGetSessionStat(pJobCtx->pSessions[m], &pStat, &allocSize, NULL); int64_t allocSize = 0;
int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat); taosMemPoolGetSessionStat(pJobCtx->pSessions[m], &pStat, &allocSize, NULL);
int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat);
assert(allocSize == usedSize); assert(allocSize == usedSize);
assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[m].stat, sizeof(*pStat))); assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[m].stat, sizeof(*pStat)));
jobUsedSize += allocSize; jobUsedSize += allocSize;
}
} }
assert(pJobCtx->pJob->memInfo->allocMemSize == jobUsedSize); assert(pJobCtx->pJob->memInfo->allocMemSize == jobUsedSize);