From d41fd271893477cbea05d9b171859db58e6c4c6f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Nov 2023 11:57:21 +0800 Subject: [PATCH] fix(stream): restart tasks in stream threads, instead of write thread. --- include/libs/stream/tstream.h | 3 +- source/dnode/vnode/src/inc/tq.h | 7 +-- source/dnode/vnode/src/inc/vnodeInt.h | 7 ++- source/dnode/vnode/src/tq/tq.c | 51 ++++-------------- source/dnode/vnode/src/tq/tqStreamTask.c | 68 ++++++++++++++++++++++-- source/dnode/vnode/src/vnd/vnodeSync.c | 4 +- source/libs/stream/src/streamMeta.c | 12 ++--- 7 files changed, 91 insertions(+), 61 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0605f4b2e5..4dc1b9315e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -432,7 +432,8 @@ struct SStreamTask { typedef struct STaskStartInfo { int64_t startTs; int64_t readyTs; - int32_t startAllTasksFlag; + int32_t tasksWillRestart; + int32_t taskRestarting; // restart flag, sentinel to guard the restart procedure. SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing int32_t elapsedTime; } STaskStartInfo; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index cd285fe6a1..4e4431d4d9 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -43,9 +43,9 @@ extern "C" { typedef struct STqOffsetStore STqOffsetStore; -// tqPush #define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) -#define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2) +#define STREAM_EXEC_START_ALL_TASKS_ID (-2) +#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3) // tqExec typedef struct { @@ -155,9 +155,6 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); -int32_t tqScanWal(STQ* pTq); -int32_t tqStartStreamTask(STQ* pTq); int32_t tqResetStreamTaskStatus(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index f184702eda..724f3bc92e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -231,7 +231,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqLaunchStreamTaskAsync(STQ* pTq); + +int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart); +int32_t tqRestartStreamTasks(STQ* pTq); +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); +int32_t tqScanWal(STQ* pTq); +int32_t tqStartStreamTasks(STQ* pTq); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2e6844736c..05fa029f32 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1307,14 +1307,15 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t taskId = pReq->taskId; int32_t vgId = TD_VID(pTq->pVnode); - if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) { - tqStartStreamTask(pTq); - return 0; - } - if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal tqScanWal(pTq); return 0; + } else if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { + tqStartStreamTasks(pTq); + return 0; + } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { + tqRestartStreamTasks(pTq); + return 0; } SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId); @@ -1900,7 +1901,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); - pMeta->startInfo.startAllTasksFlag = 1; + pMeta->startInfo.tasksWillRestart = 1; if (updateTasks < numOfTasks) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, @@ -1909,45 +1910,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } else { if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); - pMeta->startInfo.startAllTasksFlag = 0; + pMeta->startInfo.tasksWillRestart = 0; streamMetaWUnLock(pMeta); } else { - tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId); - terrno = 0; - - streamMetaWUnLock(pMeta); - - while (streamMetaTaskInTimer(pMeta)) { - tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - - streamMetaWLock(pMeta); - - int32_t code = streamMetaReopen(pMeta); - if (code != 0) { - tqError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { - tqError("vgId:%d failed to load stream tasks", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); - tqResetStreamTaskStatus(pTq); - tqLaunchStreamTaskAsync(pTq); - } else { - tqInfo("vgId:%d, follower node not start stream tasks", vgId); - } - streamMetaWUnLock(pMeta); + tqStartStreamTaskAsync(pTq, true); } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 2d94f23009..f7607dabab 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -58,7 +58,7 @@ int32_t tqScanWal(STQ* pTq) { return 0; } -int32_t tqStartStreamTask(STQ* pTq) { +int32_t tqStartStreamTasks(STQ* pTq) { int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -115,7 +115,67 @@ int32_t tqStartStreamTask(STQ* pTq) { return code; } -int32_t tqLaunchStreamTaskAsync(STQ* pTq) { +int32_t tqRestartStreamTasks(STQ* pTq) { + SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t vgId = pMeta->vgId; + int32_t code = 0; + int64_t st = taosGetTimestampMs(); + + while(1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskRestarting, 0, 1); + if (startVal == 0) { + break; + } + + tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + + terrno = 0; + tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, + pMeta->updateInfo.transId); + + while (streamMetaTaskInTimer(pMeta)) { + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + streamMetaWLock(pMeta); + + code = streamMetaReopen(pMeta); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d failed to reopen stream meta", vgId); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + + int64_t el = taosGetTimestampMs() - st; + + tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.); + + code = streamMetaLoadAllTasks(pTq->pStreamMeta); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + + if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { + tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + tqResetStreamTaskStatus(pTq); + tqStartStreamTasks(pTq); + } else { + tqInfo("vgId:%d, follower node not start stream tasks", vgId); + } + + streamMetaWUnLock(pMeta); + code = terrno; + return code; +} + +int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = pMeta->vgId; @@ -132,10 +192,10 @@ int32_t tqLaunchStreamTaskAsync(STQ* pTq) { return -1; } - tqDebug("vgId:%d check %d stream task(s) status async", vgId, numOfTasks); + tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; - pRunReq->taskId = STREAM_EXEC_TASK_STATUS_CHECK_ID; + pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index b4afd898fa..d4876caf15 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -554,7 +554,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; streamMetaWLock(pMeta); - if (pMeta->startInfo.startAllTasksFlag) { + if (pMeta->startInfo.tasksWillRestart) { vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); streamMetaWUnLock(pMeta); return; @@ -567,7 +567,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) } else { vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); tqResetStreamTaskStatus(pVnode->pTq); - tqLaunchStreamTaskAsync(pVnode->pTq); + tqStartStreamTaskAsync(pVnode->pTq, false); } } else { vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7dfe88e6de..ee02ccf8f1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -228,12 +228,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } int32_t streamMetaReopen(SStreamMeta* pMeta) { - // backup the restart flag - int32_t restartFlag = pMeta->startInfo.startAllTasksFlag; streamMetaClear(pMeta); - pMeta->startInfo.startAllTasksFlag = restartFlag; - // NOTE: role should not be changed during reopen meta pMeta->streamBackendRid = -1; pMeta->streamBackend = NULL; @@ -302,7 +298,9 @@ void streamMetaClear(SStreamMeta* pMeta) { pMeta->numOfPausedTasks = 0; pMeta->chkptNotReadyTasks = 0; - streamMetaResetStartInfo(&pMeta->startInfo); + // the willrestart/starting flag can NOT be cleared + taosHashClear(pMeta->startInfo.pReadyTaskSet); + pMeta->startInfo.readyTs = 0; } void streamMetaClose(SStreamMeta* pMeta) { @@ -1093,8 +1091,10 @@ void streamMetaInitForSnode(SStreamMeta* pMeta) { void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { taosHashClear(pStartInfo->pReadyTaskSet); - pStartInfo->startAllTasksFlag = 0; + pStartInfo->tasksWillRestart = 0; pStartInfo->readyTs = 0; + // reset the sentinel flag value to be 0 + atomic_store_32(&pStartInfo->taskRestarting, 0); } void streamMetaRLock(SStreamMeta* pMeta) {