fix(stream): fix invalid unlock.

This commit is contained in:
Haojun Liao 2024-05-15 10:08:56 +08:00
parent 610aa80e65
commit 042ed3caff
1 changed files with 7 additions and 8 deletions

View File

@ -257,19 +257,18 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
return 0; return 0;
} }
STaskDbWrapper* pBackend = taskDbOpen(pMeta->path, key, chkpId); STaskDbWrapper* pBackend = NULL;
while (1) { while (1) {
if (pBackend == NULL) { pBackend = taskDbOpen(pMeta->path, key, chkpId);
taosThreadMutexUnlock(&pMeta->backendMutex); if (pBackend != NULL) {
taosMsleep(1000);
stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
} else {
taosThreadMutexUnlock(&pMeta->backendMutex);
break; break;
} }
taosThreadMutexUnlock(&pMeta->backendMutex);
taosMsleep(1000);
stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
taosThreadMutexLock(&pMeta->backendMutex); taosThreadMutexLock(&pMeta->backendMutex);
pBackend = taskDbOpen(pMeta->path, key, chkpId);
} }
int64_t tref = taosAddRef(taskDbWrapperId, pBackend); int64_t tref = taosAddRef(taskDbWrapperId, pBackend);