From 1c562bd535215f8292f993c8253501302ec84cee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Jan 2024 09:41:40 +0800 Subject: [PATCH] refactor: enable the restart of stream tasks. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 15 ++++++++++++--- source/libs/stream/src/streamMeta.c | 1 + 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 2e83cd5bc5..b7b3893dfe 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -729,9 +729,9 @@ int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks) { SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); streamTaskResetStatus(*pTask); - if ((*pTask)->info.fillHistory == 1) { - streamResetParamForScanHistory(*pTask); - } +// if ((*pTask)->info.fillHistory == 1) { +// streamResetParamForScanHistory(*pTask); +// } } return 0; @@ -764,10 +764,19 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } streamMetaWLock(pMeta); + streamMetaClear(pMeta); int64_t el = taosGetTimestampMs() - st; tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.); + code = streamMetaLoadAllTasks(pMeta); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + { STaskStartInfo* pStartInfo = &pMeta->startInfo; taosHashClear(pStartInfo->pReadyTaskSet); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 30a40ffc9f..6bb90a9c25 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -469,6 +469,7 @@ void streamMetaClear(SStreamMeta* pMeta) { taosArrayClear(pMeta->pTaskList); taosArrayClear(pMeta->chkpSaved); taosArrayClear(pMeta->chkpInUse); + pMeta->numOfStreamTasks = 0; pMeta->numOfPausedTasks = 0;