From 1a8583887f471ca7e87128a7c6c4e17517d8d8a1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Dec 2023 15:35:43 +0800 Subject: [PATCH] fix(stream): remove clear backendcache. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 5 +++-- source/libs/stream/src/streamMeta.c | 4 ++-- source/libs/stream/src/streamStart.c | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 2201f0744e..0864afdf2c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -686,7 +686,8 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { streamMetaWLock(pMeta); if (pMeta->startInfo.taskStarting == 1) { pMeta->startInfo.restartCount += 1; - tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, %d", vgId, pMeta->startInfo.restartCount); + tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, + pMeta->startInfo.restartCount); streamMetaWUnLock(pMeta); return TSDB_CODE_SUCCESS; } @@ -720,7 +721,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { if (isLeader && !tsDisableStream) { tqStreamTaskResetStatus(pMeta); streamMetaWUnLock(pMeta); - tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + tqInfo("vgId:%d start all stream tasks after reload tasks from disk", vgId); startStreamTasks(pMeta); } else { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2bbc1607b3..b67b81b611 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -266,11 +266,12 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { if (pBackend == NULL) { taosThreadMutexUnlock(&pMeta->backendMutex); taosMsleep(1000); - stDebug("backed holded by other task, restart later, path: %s, key: %s", pMeta->path, key); + stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key); } else { taosThreadMutexUnlock(&pMeta->backendMutex); break; } + taosThreadMutexLock(&pMeta->backendMutex); pBackend = taskDbOpen(pMeta->path, key, chkpId); } @@ -448,7 +449,6 @@ void streamMetaClear(SStreamMeta* pMeta) { taosRemoveRef(streamBackendId, pMeta->streamBackendRid); taosHashClear(pMeta->pTasksMap); - taosHashClear(pMeta->pTaskDbUnique); taosArrayClear(pMeta->pTaskList); taosArrayClear(pMeta->chkpSaved); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 68380e4ba4..5ce7668048 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1113,7 +1113,7 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI pStartInfo->completeFn(pMeta); } else { streamMetaWUnLock(pMeta); - stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); + stDebug("vgId:%d recv check downstream results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); } return TSDB_CODE_SUCCESS;