diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 97bb7a29b8..c18eddd034 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -53,7 +53,7 @@ typedef struct SDataSinkHandle { } SDataSinkHandle; int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle); -int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, +int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam); int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam); diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index be1f580a93..890940960e 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -258,6 +258,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { } taosCloseQueue(pDeleter->pDataBlocks); (void)taosThreadMutexDestroy(&pDeleter->mutex); + nodesDestroyNode((SNode*)pDeleter->pSchema); + pDeleter->pSchema = NULL; taosMemoryFree(pDeleter->pManager); @@ -280,7 +282,7 @@ static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) { } -int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, +int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam) { int32_t code = TSDB_CODE_SUCCESS; if (pParam == NULL) { @@ -310,6 +312,7 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData deleter->pManager = pManager; deleter->pDeleter = pDeleterNode; deleter->pSchema = pDataSink->pInputDataBlockDesc; + pDataSink->pInputDataBlockDesc = NULL; deleter->pParam = pParam; deleter->status = DS_BUF_EMPTY; diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index a982253606..5c275ffc5e 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -392,7 +392,7 @@ void mptInitJob(int32_t idx) { pJobCtx->jobIdx = idx; pJobCtx->jobId = atomic_add_fetch_64(&mptCtx.qId, 1); - pJobCtx->taskNum = (mptCtrl.jobTaskNum) ? mptCtrl.jobTaskNum : (taosRand() % MPT_MAX_SESSION_NUM) + 1; + pJobCtx->taskNum = (mptCtrl.jobTaskNum) ? mptCtrl.jobTaskNum : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_SESSION_NUM/10)) : (taosRand() % MPT_MAX_SESSION_NUM)) + 1; for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { mptInitTask(i, 0, pJobCtx); assert(pJobCtx->pJob); @@ -853,8 +853,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { } } -void mptSimulateTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { - int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : taosRand() % MPT_MAX_MEM_ACT_TIMES; +void mptSimulateTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask, int32_t actTimes) { uDebug("JOB:0x%x TASK:0x%x will start total %d actions", pJobCtx->jobId, pTask->taskId, actTimes); for (int32_t i = 0; i < actTimes; ++i) { @@ -862,6 +861,8 @@ void mptSimulateTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { uDebug("JOB:0x%x TASK:0x%x stop running cause of job already retired", pJobCtx->jobId, pTask->taskId); return; } + + //MPT_PRINTF("\tTASK:0x%x will start %d:%d actions\n", pTask->taskId, i, actTimes); mptSimulateAction(pJobCtx, pTask); } @@ -894,7 +895,7 @@ void mptSimulateOutTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { } -void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx) { +void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx, int32_t actTimes) { uDebug("JOB:0x%x TASK:0x%x start running", pJobCtx->jobId, pCtx->taskId); if (atomic_load_8(&pJobCtx->pJob->retired)) { @@ -910,7 +911,7 @@ void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx) { atomic_add_fetch_32(&pJobCtx->taskRunningNum, 1); mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[idx]); - mptSimulateTask(pJobCtx, pCtx); + mptSimulateTask(pJobCtx, pCtx, actTimes); mptDisableMemoryPoolUsage(); if (!atomic_load_8(&pJobCtx->pJob->retired)) { @@ -979,11 +980,11 @@ void* mptThreadFunc(void* param) { int32_t jobNum = (mptCtrl.jobNum) ? mptCtrl.jobNum : MPT_MAX_JOB_NUM; for (int32_t n = 0; n < jobExecTimes; ++n) { - MPT_PRINTF("Thread %d start the %d:%d job loops\n", pThread->idx, n, jobExecTimes); + MPT_PRINTF("Thread %d start the %d:%d job loops - jobNum:%d\n", pThread->idx, n, jobExecTimes, jobNum); for (int32_t i = 0; i < jobNum; ++i) { SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; - MPT_PRINTF(" Thread %d start %dth JOB 0x%x exec\n", pThread->idx, pJobCtx->jobIdx, pJobCtx->jobId); + MPT_PRINTF(" Thread %d start to run %d:%d job[%d:0x%" PRIx64 "]\n", pThread->idx, i, jobNum, pJobCtx->jobIdx, pJobCtx->jobId); if (mptResetJob(pJobCtx)) { continue; @@ -994,8 +995,9 @@ void* mptThreadFunc(void* param) { } if (mptCtx.param.randTask) { + int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES)); int32_t taskIdx = taosRand() % pJobCtx->taskNum; - mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx); + mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, actTimes); continue; } @@ -1003,7 +1005,11 @@ void* mptThreadFunc(void* param) { if (atomic_load_8(&pJobCtx->pJob->retired)) { break; } - mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m); + + int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES)); + + MPT_PRINTF("Thread %d start to run %d:%d task\n", pThread->idx, m, pJobCtx->taskNum); + mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m, actTimes); } taosRUnLockLatch(&pJobCtx->jobExecLock);