diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 22a176f0bb..93e0064192 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -40,4 +40,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); +int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode); + #endif // TDENGINE_TQ_COMMON_H diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 024c3c6bae..8c5d056893 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -31,7 +31,6 @@ extern "C" { #endif struct SSnode { - char* path; SStreamMeta* pMeta; SMsgCb msgCb; }; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 3bef5b595b..bd07974c3f 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -32,6 +32,7 @@ static STaskId replaceStreamTaskId(SStreamTask *pTask) { pTask->id.taskId = pTask->streamTaskId.taskId; return id; } + static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) { ASSERT(pTask->info.fillHistory); pTask->id.taskId = pId->taskId; @@ -48,42 +49,16 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer streamTaskOpenAllUpstreamInput(pTask); - STaskId taskId = {0}; - if (pTask->info.fillHistory) { - taskId = replaceStreamTaskId(pTask); + code = tqExpandStreamTask(pTask, pSnode->pMeta, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; } - pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); - if (pTask->pState == NULL) { - sndError("s-task:%s failed to open state for task", pTask->id.idStr); - return -1; - } else { - sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); - } - - if (pTask->info.fillHistory) { - restoreStreamTaskId(pTask, &taskId); - } - - int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); - SReadHandle handle = { - .checkpointId = pTask->chkInfo.checkpointId, - .vnode = NULL, - .numOfVgroups = numOfVgroups, - .pStateBackend = pTask->pState, - .fillHistory = pTask->info.fillHistory, - .winRange = pTask->dataRange.window, - }; - initStreamStateAPI(&handle.api); - - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, SNODE_HANDLE, pTask->id.taskId); - ASSERT(pTask->exec.pExecutor); - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); - streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); SCheckpointInfo *pChkInfo = &pTask->chkInfo; + // checkpoint ver is the kept version, handled data should be the next version. if (pChkInfo->checkpointId != 0) { pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; @@ -117,11 +92,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pSnode->path = taosStrdup(path); - if (pSnode->path == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; - } pSnode->msgCb = pOption->msgCb; pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback); @@ -140,7 +110,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { return pSnode; FAIL: - taosMemoryFree(pSnode->path); taosMemoryFree(pSnode); return NULL; } @@ -156,7 +125,6 @@ void sndClose(SSnode *pSnode) { streamMetaNotifyClose(pSnode->pMeta); streamMetaCommit(pSnode->pMeta); streamMetaClose(pSnode->pMeta); - taosMemoryFree(pSnode->path); taosMemoryFree(pSnode); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7886967be0..fb47414fb9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -699,22 +699,6 @@ end: static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } -static STaskId replaceStreamTaskId(SStreamTask* pTask) { - ASSERT(pTask->info.fillHistory); - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; - - return id; -} - -static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { - ASSERT(pTask->info.fillHistory); - pTask->id.taskId = pId->taskId; - pTask->id.streamId = pId->streamId; -} - int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { int32_t vgId = TD_VID(pTq->pVnode); tqDebug("s-task:0x%x start to expand task", pTask->id.taskId); @@ -724,74 +708,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { return code; } - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - STaskId taskId = {0}; - if (pTask->info.fillHistory) { - taskId = replaceStreamTaskId(pTask); - } - - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); - if (pTask->pState == NULL) { - tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId); - return -1; - } - - tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); - if (pTask->info.fillHistory) { - restoreStreamTaskId(pTask, &taskId); - } - - SReadHandle handle = { - .checkpointId = pTask->chkInfo.checkpointId, - .vnode = pTq->pVnode, - .initTqReader = 1, - .pStateBackend = pTask->pState, - .fillHistory = pTask->info.fillHistory, - .winRange = pTask->dataRange.window, - }; - - initStorageAPI(&handle.api); - - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); - if (pTask->exec.pExecutor == NULL) { - return -1; - } - - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); - } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - STaskId taskId = {0}; - if (pTask->info.fillHistory) { - taskId = replaceStreamTaskId(pTask); - } - - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); - if (pTask->pState == NULL) { - tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId); - return -1; - } else { - tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); - } - - if (pTask->info.fillHistory) { - restoreStreamTaskId(pTask, &taskId); - } - - SReadHandle handle = { - .checkpointId = pTask->chkInfo.checkpointId, - .vnode = NULL, - .numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), - .pStateBackend = pTask->pState, - .fillHistory = pTask->info.fillHistory, - .winRange = pTask->dataRange.window, - }; - - initStorageAPI(&handle.api); - - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); - if (pTask->exec.pExecutor == NULL) { - return -1; - } - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); + code = tqExpandStreamTask(pTask, pTq->pStreamMeta, pTq->pVnode); + if (code != TSDB_CODE_SUCCESS) { + return code; } // sink @@ -827,6 +746,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); + SCheckpointInfo* pChkInfo = &pTask->chkInfo; // checkpoint ver is the kept version, handled data should be the next version. diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 2fa9f9a9ff..f85bb8cee5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -23,6 +23,67 @@ typedef struct STaskUpdateEntry { int32_t transId; } STaskUpdateEntry; +static STaskId replaceStreamTaskId(SStreamTask* pTask) { + ASSERT(pTask->info.fillHistory); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + + pTask->id.streamId = pTask->streamTaskId.streamId; + pTask->id.taskId = pTask->streamTaskId.taskId; + + return id; +} + +static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { + ASSERT(pTask->info.fillHistory); + pTask->id.taskId = pId->taskId; + pTask->id.streamId = pId->streamId; +} + +int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) { + int32_t vgId = pMeta->vgId; + STaskId taskId = {0}; + + if (pTask->info.fillHistory) { + taskId = replaceStreamTaskId(pTask); + } + + pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1); + if (pTask->pState == NULL) { + tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId); + return -1; + } else { + tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); + } + + if (pTask->info.fillHistory) { + restoreStreamTaskId(pTask, &taskId); + } + + SReadHandle handle = { + .checkpointId = pTask->chkInfo.checkpointId, + .pStateBackend = pTask->pState, + .fillHistory = pTask->info.fillHistory, + .winRange = pTask->dataRange.window, + }; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + handle.vnode = pVnode; + handle.initTqReader = 1; + } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); + } + + initStorageAPI(&handle.api); + + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); + if (pTask->exec.pExecutor == NULL) { + tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); + return -1; + } + + qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); + return TSDB_CODE_SUCCESS; +} + int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);