From 0840e6a0d8075df5f59a093af41892f50ce9fb31 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 18 Dec 2023 11:19:15 +0800 Subject: [PATCH] fix stream backend crash when transfer --- source/libs/stream/src/streamMeta.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 23cb6f5a35..c06dae76d7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -260,9 +260,17 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { } STaskDbWrapper* pBackend = taskDbOpen(pMeta->path, key, chkpId); - if (pBackend == NULL) { - taosThreadMutexUnlock(&pMeta->backendMutex); - return -1; + while (1) { + if (pBackend == NULL) { + taosThreadMutexUnlock(&pMeta->backendMutex); + taosMsleep(1000); + stDebug("backed holded by other task, restart later, path: %s, key: %s", pMeta->path, key); + } else { + taosThreadMutexUnlock(&pMeta->backendMutex); + break; + } + taosThreadMutexLock(&pMeta->backendMutex); + pBackend = taskDbOpen(pMeta->path, key, chkpId); } int64_t tref = taosAddRef(taskDbWrapperId, pBackend); @@ -794,7 +802,7 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { } int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { - TBC* pCur = NULL; + TBC* pCur = NULL; void* pKey = NULL; int32_t kLen = 0; void* pVal = NULL;