From c43a6b272c59ee3100e6b593f82998988a57a2da Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Aug 2023 02:09:50 +0000 Subject: [PATCH] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d72901a2a3..8492410a46 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -221,6 +221,19 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list bm->preCkptId = chkpId; bm->curChkpId = chkpId; bm->init = 1; + + SArray* add = taosArrayInit(64, sizeof(void*)); + + void* pIter = taosHashIterate(pTable, NULL); + while (pIter) { + size_t len; + char* name = taosHashGetKey(pIter, &len); + if (name != NULL && len != 0) { + taosArrayPush(add, &name); + } + pIter = taosHashIterate(pTable, pIter); + } + } else { SArray* add = taosArrayInit(64, sizeof(void*)); SArray* del = taosArrayInit(64, sizeof(void*)); @@ -233,6 +246,20 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list return 0; } +int32_t backendManagerDumpTo(SBackendManager* bm, char* name) { + int32_t code = 0; + char* buf = taosMemoryCalloc(1, strlen(bm->path) + 64); + sprintf(buf, "%s%s%s", bm->path, TD_DIRSEP, name); + + code = taosMkDir(buf); + if (code != 0) { + return code; + } + + + +} + SCfInit ginitDict[] = { {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, destroyFunc, encodeValueFunc, decodeValueFunc},