fix: data deleter schema issue

This commit is contained in:
dapan1121 2024-11-14 17:38:20 +08:00
parent 0995facd64
commit 40f9e366e9
3 changed files with 20 additions and 11 deletions

View File

@ -53,7 +53,7 @@ typedef struct SDataSinkHandle {
} SDataSinkHandle;
int32_t createDataDispatcher(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
void* pParam);
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
void* pParam);

View File

@ -258,6 +258,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
}
taosCloseQueue(pDeleter->pDataBlocks);
(void)taosThreadMutexDestroy(&pDeleter->mutex);
nodesDestroyNode((SNode*)pDeleter->pSchema);
pDeleter->pSchema = NULL;
taosMemoryFree(pDeleter->pManager);
@ -280,7 +282,7 @@ static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
}
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
void* pParam) {
int32_t code = TSDB_CODE_SUCCESS;
if (pParam == NULL) {
@ -310,6 +312,7 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
deleter->pManager = pManager;
deleter->pDeleter = pDeleterNode;
deleter->pSchema = pDataSink->pInputDataBlockDesc;
pDataSink->pInputDataBlockDesc = NULL;
deleter->pParam = pParam;
deleter->status = DS_BUF_EMPTY;

View File

@ -392,7 +392,7 @@ void mptInitJob(int32_t idx) {
pJobCtx->jobIdx = idx;
pJobCtx->jobId = atomic_add_fetch_64(&mptCtx.qId, 1);
pJobCtx->taskNum = (mptCtrl.jobTaskNum) ? mptCtrl.jobTaskNum : (taosRand() % MPT_MAX_SESSION_NUM) + 1;
pJobCtx->taskNum = (mptCtrl.jobTaskNum) ? mptCtrl.jobTaskNum : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_SESSION_NUM/10)) : (taosRand() % MPT_MAX_SESSION_NUM)) + 1;
for (int32_t i = 0; i < pJobCtx->taskNum; ++i) {
mptInitTask(i, 0, pJobCtx);
assert(pJobCtx->pJob);
@ -853,8 +853,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
}
}
void mptSimulateTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : taosRand() % MPT_MAX_MEM_ACT_TIMES;
void mptSimulateTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask, int32_t actTimes) {
uDebug("JOB:0x%x TASK:0x%x will start total %d actions", pJobCtx->jobId, pTask->taskId, actTimes);
for (int32_t i = 0; i < actTimes; ++i) {
@ -863,6 +862,8 @@ void mptSimulateTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
return;
}
//MPT_PRINTF("\tTASK:0x%x will start %d:%d actions\n", pTask->taskId, i, actTimes);
mptSimulateAction(pJobCtx, pTask);
}
}
@ -894,7 +895,7 @@ void mptSimulateOutTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
}
void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx) {
void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx, int32_t actTimes) {
uDebug("JOB:0x%x TASK:0x%x start running", pJobCtx->jobId, pCtx->taskId);
if (atomic_load_8(&pJobCtx->pJob->retired)) {
@ -910,7 +911,7 @@ void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx) {
atomic_add_fetch_32(&pJobCtx->taskRunningNum, 1);
mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[idx]);
mptSimulateTask(pJobCtx, pCtx);
mptSimulateTask(pJobCtx, pCtx, actTimes);
mptDisableMemoryPoolUsage();
if (!atomic_load_8(&pJobCtx->pJob->retired)) {
@ -979,11 +980,11 @@ void* mptThreadFunc(void* param) {
int32_t jobNum = (mptCtrl.jobNum) ? mptCtrl.jobNum : MPT_MAX_JOB_NUM;
for (int32_t n = 0; n < jobExecTimes; ++n) {
MPT_PRINTF("Thread %d start the %d:%d job loops\n", pThread->idx, n, jobExecTimes);
MPT_PRINTF("Thread %d start the %d:%d job loops - jobNum:%d\n", pThread->idx, n, jobExecTimes, jobNum);
for (int32_t i = 0; i < jobNum; ++i) {
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
MPT_PRINTF(" Thread %d start %dth JOB 0x%x exec\n", pThread->idx, pJobCtx->jobIdx, pJobCtx->jobId);
MPT_PRINTF(" Thread %d start to run %d:%d job[%d:0x%" PRIx64 "]\n", pThread->idx, i, jobNum, pJobCtx->jobIdx, pJobCtx->jobId);
if (mptResetJob(pJobCtx)) {
continue;
@ -994,8 +995,9 @@ void* mptThreadFunc(void* param) {
}
if (mptCtx.param.randTask) {
int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES));
int32_t taskIdx = taosRand() % pJobCtx->taskNum;
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx);
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, actTimes);
continue;
}
@ -1003,7 +1005,11 @@ void* mptThreadFunc(void* param) {
if (atomic_load_8(&pJobCtx->pJob->retired)) {
break;
}
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m);
int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES));
MPT_PRINTF("Thread %d start to run %d:%d task\n", pThread->idx, m, pJobCtx->taskNum);
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m, actTimes);
}
taosRUnLockLatch(&pJobCtx->jobExecLock);