fix(stream): remove clear backendcache.

This commit is contained in:
Haojun Liao 2023-12-20 15:35:43 +08:00
parent d25a323a4c
commit 1a8583887f
3 changed files with 6 additions and 5 deletions

View File

@ -686,7 +686,8 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if (pMeta->startInfo.taskStarting == 1) { if (pMeta->startInfo.taskStarting == 1) {
pMeta->startInfo.restartCount += 1; pMeta->startInfo.restartCount += 1;
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, %d", vgId, pMeta->startInfo.restartCount); tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
pMeta->startInfo.restartCount);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -720,7 +721,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
if (isLeader && !tsDisableStream) { if (isLeader && !tsDisableStream) {
tqStreamTaskResetStatus(pMeta); tqStreamTaskResetStatus(pMeta);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); tqInfo("vgId:%d start all stream tasks after reload tasks from disk", vgId);
startStreamTasks(pMeta); startStreamTasks(pMeta);
} else { } else {

View File

@ -266,11 +266,12 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
if (pBackend == NULL) { if (pBackend == NULL) {
taosThreadMutexUnlock(&pMeta->backendMutex); taosThreadMutexUnlock(&pMeta->backendMutex);
taosMsleep(1000); taosMsleep(1000);
stDebug("backed holded by other task, restart later, path: %s, key: %s", pMeta->path, key); stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
} else { } else {
taosThreadMutexUnlock(&pMeta->backendMutex); taosThreadMutexUnlock(&pMeta->backendMutex);
break; break;
} }
taosThreadMutexLock(&pMeta->backendMutex); taosThreadMutexLock(&pMeta->backendMutex);
pBackend = taskDbOpen(pMeta->path, key, chkpId); pBackend = taskDbOpen(pMeta->path, key, chkpId);
} }
@ -448,7 +449,6 @@ void streamMetaClear(SStreamMeta* pMeta) {
taosRemoveRef(streamBackendId, pMeta->streamBackendRid); taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
taosHashClear(pMeta->pTasksMap); taosHashClear(pMeta->pTasksMap);
taosHashClear(pMeta->pTaskDbUnique);
taosArrayClear(pMeta->pTaskList); taosArrayClear(pMeta->pTaskList);
taosArrayClear(pMeta->chkpSaved); taosArrayClear(pMeta->chkpSaved);

View File

@ -1113,7 +1113,7 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI
pStartInfo->completeFn(pMeta); pStartInfo->completeFn(pMeta);
} else { } else {
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); stDebug("vgId:%d recv check downstream results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;