fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2023-07-22 00:20:02 +08:00
parent 1b2636028a
commit bfc4a07207
2 changed files with 9 additions and 5 deletions

View File

@ -1114,6 +1114,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el); tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
streamTaskEnablePause(pTask);
SVersionRange* pRange = NULL; SVersionRange* pRange = NULL;
SStreamTask* pStreamTask = NULL; SStreamTask* pStreamTask = NULL;

View File

@ -204,11 +204,6 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
} }
// enable pause when init completed.
if (pTask->historyTaskId.taskId == 0) {
streamTaskEnablePause(pTask);
}
// when current stream task is ready, check the related fill history task. // when current stream task is ready, check the related fill history task.
launchFillHistoryTask(pTask); launchFillHistoryTask(pTask);
} }
@ -477,6 +472,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
} }
streamNotifyUpstreamContinue(pTask); streamNotifyUpstreamContinue(pTask);
// sink node does not receive the pause msg from mnode
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamTaskEnablePause(pTask);
}
} else { } else {
qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left); pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left);
@ -497,6 +497,8 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pTask);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
streamTaskEnablePause(pTask);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamSchedExec(pTask); streamSchedExec(pTask);
} }