diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 76d89be802..675bfa334a 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -43,9 +43,9 @@ extern "C" { typedef struct STqOffsetStore STqOffsetStore; -// tqPush #define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) -#define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2) +#define STREAM_EXEC_START_ALL_TASKS_ID (-2) +#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3) #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) // tqExec @@ -156,9 +156,6 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); -int32_t tqScanWal(STQ* pTq); -int32_t tqStartStreamTask(STQ* pTq); int32_t tqResetStreamTaskStatus(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 89d70bfabb..2a9510f6ad 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -231,7 +231,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqLaunchStreamTaskAsync(STQ* pTq); + +int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart); +int32_t tqRestartStreamTasks(STQ* pTq); +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); +int32_t tqScanWal(STQ* pTq); +int32_t tqStartStreamTasks(STQ* pTq); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bb4d7d42b9..1c1a4a192c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1317,14 +1317,15 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t taskId = pReq->taskId; int32_t vgId = TD_VID(pTq->pVnode); - if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) { - tqStartStreamTask(pTq); - return 0; - } - if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal tqScanWal(pTq); return 0; + } else if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { + tqStartStreamTasks(pTq); + return 0; + } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { + tqRestartStreamTasks(pTq); + return 0; } SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId); @@ -1911,7 +1912,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); - pMeta->startInfo.startAllTasksFlag = 1; + pMeta->startInfo.tasksWillRestart = 1; if (updateTasks < numOfTasks) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, @@ -1920,45 +1921,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } else { if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); - pMeta->startInfo.startAllTasksFlag = 0; + pMeta->startInfo.tasksWillRestart = 0; streamMetaWUnLock(pMeta); } else { - tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId); - terrno = 0; - - streamMetaWUnLock(pMeta); - - while (streamMetaTaskInTimer(pMeta)) { - tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - - streamMetaWLock(pMeta); - - int32_t code = streamMetaReopen(pMeta); - if (code != 0) { - tqError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { - tqError("vgId:%d failed to load stream tasks", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); - tqResetStreamTaskStatus(pTq); - tqLaunchStreamTaskAsync(pTq); - } else { - tqInfo("vgId:%d, follower node not start stream tasks", vgId); - } - streamMetaWUnLock(pMeta); + tqStartStreamTaskAsync(pTq, true); } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 52e67ef456..5f7a33499b 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -60,7 +60,7 @@ int32_t tqScanWal(STQ* pTq) { return 0; } -int32_t tqStartStreamTask(STQ* pTq) { +int32_t tqStartStreamTasks(STQ* pTq) { int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -125,7 +125,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) { int64_t st = taosGetTimestampMs(); while(1) { - int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskRestarting, 0, 1); if (startVal == 0) { break; } @@ -155,7 +155,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) { int64_t el = taosGetTimestampMs() - st; - tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); + tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.); code = streamMetaLoadAllTasks(pTq->pStreamMeta); if (code != TSDB_CODE_SUCCESS) { @@ -168,15 +168,12 @@ int32_t tqRestartStreamTasks(STQ* pTq) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); tqResetStreamTaskStatus(pTq); - - streamMetaWUnLock(pMeta); tqStartStreamTasks(pTq); } else { - streamMetaResetStartInfo(&pMeta->startInfo); - streamMetaWUnLock(pMeta); tqInfo("vgId:%d, follower node not start stream tasks", vgId); } + streamMetaWUnLock(pMeta); code = terrno; return code; } @@ -198,10 +195,10 @@ int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart) { return -1; } - tqDebug("vgId:%d check %d stream task(s) status async", vgId, numOfTasks); + tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; - pRunReq->taskId = STREAM_EXEC_TASK_STATUS_CHECK_ID; + pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 4a0c987e57..3944f8ed91 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -557,7 +557,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; streamMetaWLock(pMeta); - if (pMeta->startInfo.startAllTasksFlag) { + if (pMeta->startInfo.tasksWillRestart) { vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); streamMetaWUnLock(pMeta); return; @@ -570,7 +570,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) } else { vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); tqResetStreamTaskStatus(pVnode->pTq); - tqLaunchStreamTaskAsync(pVnode->pTq); + tqStartStreamTaskAsync(pVnode->pTq, false); } } else { vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);