From 100d2240c35f5e68d47d4e21cc3c5e8075c28fda Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 10 Aug 2023 06:35:25 +0000 Subject: [PATCH] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 14c94a7996..69ba44b612 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -28,14 +28,17 @@ typedef struct { int64_t preCkptId; int64_t curChkpId; char* path; + char* buf; int32_t len; + // ping-pong buf SHashObj* pSstTbl[2]; - SArray* pAdd; - SArray* pDel; int8_t idx; - int8_t update; + + SArray* pAdd; + SArray* pDel; + int8_t update; } SBackendManager; typedef struct SCompactFilteFactory { @@ -258,6 +261,15 @@ int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { if (taosArrayGetSize(bm->pAdd) > 0) bm->update = 1; } else { int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel); + if (code != 0) { + // dead code + taosArrayClearP(bm->pAdd, taosMemoryFree); + taosArrayClearP(bm->pDel, taosMemoryFree); + taosHashClear(bm->pSstTbl[1 - bm->idx]); + bm->update = 0; + + return code; + } bm->preCkptId = bm->curChkpId; bm->curChkpId = chkpId; @@ -714,11 +726,16 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { if (!taosDirExist(chkpPath)) { // no checkpoint, nothing to load + taosMemoryFree(chkpPath); return 0; } TdDirPtr pDir = taosOpenDir(chkpPath); - if (pDir == NULL) return 0; + + if (pDir == NULL) { + taosMemoryFree(chkpPath); + return 0; + } TdDirEntryPtr de = NULL; SArray* suffix = taosArrayInit(4, sizeof(int64_t));