From 8e042e34cb01651c4a68538564917d65fa01030b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 09:48:14 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 54 ++++++++++++++++------------------ 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ee76a27414..4834924fe0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -699,7 +699,23 @@ end: return ret; } -void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } +static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } + +static STaskId replaceStreamTaskId(SStreamTask* pTask) { + ASSERT(pTask->info.fillHistory); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + + pTask->id.streamId = pTask->streamTaskId.streamId; + pTask->id.taskId = pTask->streamTaskId.taskId; + + return id; +} + +static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { + ASSERT(pTask->info.fillHistory); + pTask->streamTaskId.taskId = pId->taskId; + pTask->streamTaskId.streamId = pId->streamId; +} int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { int32_t vgId = TD_VID(pTq->pVnode); @@ -713,15 +729,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { streamTaskOpenAllUpstreamInput(pTask); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - SStreamTask* pStateTask = pTask; - - STaskId taskId = {.streamId = 0, .taskId = 0}; + STaskId taskId = {0}; if (pTask->info.fillHistory) { - taskId.streamId = pTask->id.streamId; - taskId.taskId = pTask->id.taskId; - - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; + taskId = replaceStreamTaskId(pTask); } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -731,9 +741,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } else { tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } + if (pTask->info.fillHistory) { - pTask->id.streamId = taskId.streamId; - pTask->id.taskId = taskId.taskId; + restoreStreamTaskId(pTask, &taskId); } SReadHandle handle = { @@ -754,15 +764,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - SStreamTask* pSateTask = pTask; - // SStreamTask task = {0}; - - STaskId taskId = {.streamId = 0, .taskId = 0}; + STaskId taskId = {0}; if (pTask->info.fillHistory) { - taskId.streamId = pTask->id.streamId; - taskId.taskId = pTask->id.taskId; - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; + taskId = replaceStreamTaskId(pTask); } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -774,15 +778,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } if (pTask->info.fillHistory) { - pTask->id.streamId = taskId.streamId; - pTask->id.taskId = taskId.taskId; + restoreStreamTaskId(pTask, &taskId); } - int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); SReadHandle handle = { .checkpointId = pTask->chkInfo.checkpointId, .vnode = NULL, - .numOfVgroups = numOfVgroups, + .numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory, .winRange = pTask->dataRange.window, @@ -828,12 +830,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId); } - // // reset the task status from unfinished transaction - // if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - // tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr); - // pTask->status.taskStatus = TASK_STATUS__READY; - // } - streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo;