From 4ca39d0f3c4c2224a8b371381f1177d9af89dcd2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Aug 2023 01:45:16 +0000 Subject: [PATCH] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 2e9032a47e..f24186c673 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -299,11 +299,12 @@ _EXIT: } void streamBackendCleanup(void* arg) { SBackendWrapper* pHandle = (SBackendWrapper*)arg; - RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL); + + void* pIter = taosHashIterate(pHandle->cfInst, NULL); while (pIter != NULL) { - RocksdbCfInst* inst = *pIter; + RocksdbCfInst* inst = *(RocksdbCfInst**)pIter; destroyRocksdbCfInst(inst); - taosHashIterate(pHandle->cfInst, pIter); + pIter = taosHashIterate(pHandle->cfInst, pIter); } taosHashCleanup(pHandle->cfInst); @@ -1103,7 +1104,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); for (int i = 0; i < nCf; i++) { char* cf = cfs[i]; - if (i == 0) continue; + if (i == 0) continue; // skip default column family, not set opt + char funcname[64] = {0}; if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) { char idstr[128] = {0}; @@ -1125,7 +1127,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); inst->dbOpt = handle->dbOpt; - // rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); } else { inst = *pInst; @@ -1136,9 +1137,9 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->pHandle[idx] = cfHandle[i]; } } - void** pIter = taosHashIterate(handle->cfInst, NULL); + void* pIter = taosHashIterate(handle->cfInst, NULL); while (pIter) { - RocksdbCfInst* inst = *pIter; + RocksdbCfInst* inst = *(RocksdbCfInst**)pIter; for (int i = 0; i < cfLen; i++) { if (inst->cfOpt[i] == NULL) { @@ -1179,8 +1180,8 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { taosAcquireRef(streamBackendId, pState->streamBackendRid); SBackendWrapper* handle = backend; SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); - taosThreadMutexLock(&handle->cfMutex); + taosThreadMutexLock(&handle->cfMutex); RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { RocksdbCfInst* inst = *ppInst;