From 3bcd038b7d87a18edf1bac625f34a444ddee14dc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 8 Jan 2024 14:04:36 +0800 Subject: [PATCH] refactor: not reload info from disk when trying to restart stream tasks. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 50 +++++++++++++--------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 9a3fdbe512..5467e3dadd 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -25,7 +25,6 @@ typedef struct STaskUpdateEntry { int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqDebug("vgId:%d no stream tasks existed to run", vgId); @@ -131,7 +130,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL || *ppTask == NULL) { - tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, + tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", vgId, req.taskId); rsp.code = TSDB_CODE_SUCCESS; streamMetaWUnLock(pMeta); @@ -141,21 +140,22 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } SStreamTask* pTask = *ppTask; + const char* idstr = pTask->id.idStr; if (pMeta->updateInfo.transId != req.transId) { pMeta->updateInfo.transId = req.transId; - tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); + tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", idstr, req.transId); // info needs to be kept till the new trans to update the nodeEp arrived. taosHashClear(pMeta->updateInfo.pTasks); } else { - tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); + tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", idstr, req.transId); } STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; - void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); - if (exist != NULL) { - tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, - req.transId); + + void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); + if (pReqTask != NULL) { + tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", idstr, vgId, req.transId); rsp.code = TSDB_CODE_SUCCESS; streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); @@ -171,7 +171,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); if (ppHTask == NULL || *ppHTask == NULL) { tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", - pMeta->vgId, req.taskId); + vgId, req.taskId); CLEAR_RELATED_FILLHISTORY_TASK(pTask); } else { tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); @@ -180,7 +180,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } if (restored) { - tqDebug("s-task:%s vgId:%d start to save task", pTask->id.idStr, vgId); + tqDebug("s-task:%s vgId:%d start to save task", idstr, vgId); streamMetaSaveTask(pMeta, pTask); if (ppHTask != NULL) { streamMetaSaveTask(pMeta, *ppHTask); @@ -192,10 +192,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } #endif } else { - tqDebug("s-task:%s vgId:%d not save since restore not finish", pTask->id.idStr, vgId); + tqDebug("s-task:%s vgId:%d not save since restore not finish", idstr, vgId); } - tqDebug("s-task:%s vgId:%d start to stop task after save task", pTask->id.idStr, vgId); + tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId); streamTaskStop(pTask); // keep the already handled info @@ -206,11 +206,11 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM int64_t now = taosGetTimestampMs(); tqDebug("s-task:%s vgId:%d task nodeEp update completed, streamTask/fill-history closed, elapsed:%" PRId64 " ms", - pTask->id.idStr, vgId, now - st); + idstr, vgId, now - st); taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); } else { int64_t now = taosGetTimestampMs(); - tqDebug("s-task:%s vgId:%d, task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", pTask->id.idStr, + tqDebug("s-task:%s vgId:%d, task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", idstr, vgId, now - st); } @@ -237,6 +237,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM // for test purpose, to trigger the leader election taosMSleep(5000); #endif + tqStreamTaskStartAsync(pMeta, cb, true); streamMetaWUnLock(pMeta); } @@ -759,17 +760,24 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } streamMetaWLock(pMeta); - streamMetaClear(pMeta); +// streamMetaClear(pMeta); int64_t el = taosGetTimestampMs() - st; tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.); - code = streamMetaLoadAllTasks(pMeta); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); - streamMetaWUnLock(pMeta); - code = terrno; - return code; +// code = streamMetaLoadAllTasks(pMeta); +// if (code != TSDB_CODE_SUCCESS) { +// tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); +// streamMetaWUnLock(pMeta); +// code = terrno; +// return code; +// } + + { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + taosHashClear(pStartInfo->pReadyTaskSet); + taosHashClear(pStartInfo->pFailedTaskSet); + pStartInfo->readyTs = 0; } if (isLeader && !tsDisableStream) {