From a021b62c04769e2bf3e2bcf61f61f5d0e32b71ba Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 23 May 2024 12:02:01 +0000 Subject: [PATCH] fix backen crash --- source/libs/stream/src/streamBackendRocksdb.c | 17 +++++++++++------ source/libs/stream/src/streamSnapshot.c | 4 ++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 2e65c9ef16..b157597e60 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1155,10 +1155,13 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { taskDbAddRef(pTaskDb); int64_t chkpId = pTaskDb->chkpId; + taskDbRefChkp(pTaskDb, chkpId); code = taskDbDoCheckpoint(pTaskDb, chkpId); - taskDbRemoveRef(pTaskDb); + if (code != 0) { + taskDbUnRefChkp(pTaskDb, chkpId); + } - taskDbRefChkp(pTaskDb, pTaskDb->chkpId); + taskDbRemoveRef(pTaskDb); SStreamTask* pTask = pTaskDb->pTask; SStreamTaskSnap snap = {.streamId = pTask->id.streamId, @@ -1182,14 +1185,15 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) { for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) { SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i); sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId); - STaskDbWrapper* pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf)); - if (pTaskDb == NULL) { + STaskDbWrapper** pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf)); + if (pTaskDb == NULL || *pTaskDb == NULL) { stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId); + memset(buf, 0, sizeof(buf)); continue; } memset(buf, 0, sizeof(buf)); - taskDbUnRefChkp(pTaskDb, pSnap->chkpId); + taskDbUnRefChkp(*pTaskDb, pSnap->chkpId); } taosThreadMutexUnlock(&pMeta->backendMutex); return 0; @@ -1989,7 +1993,8 @@ void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); - for (int i = 0; i < taosArrayGetSize(pTaskDb->chkpInUse); i++) { + int32_t size = taosArrayGetSize(pTaskDb->chkpInUse); + for (int i = 0; i < size; i++) { int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i); if (*p == chkp) { taosArrayRemove(pTaskDb->chkpInUse, i); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index eb67e61c4c..25015c4d33 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -144,7 +144,7 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); } -int32_t streamDestroyTasdDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); } +int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); } void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (qDebugFlag & DEBUG_DEBUG) { @@ -333,7 +333,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { } taosArrayDestroy(handle->pDbSnapSet); } - streamDestroyTasdDbSnapInfo(handle->pMeta, handle->pSnapInfoSet); + streamDestroyTaskDbSnapInfo(handle->pMeta, handle->pSnapInfoSet); if (handle->pSnapInfoSet) { for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) { SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i);