support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-07 01:45:16 +00:00
parent f9d0874e45
commit 4ca39d0f3c
1 changed files with 9 additions and 8 deletions

View File

@ -299,11 +299,12 @@ _EXIT:
} }
void streamBackendCleanup(void* arg) { void streamBackendCleanup(void* arg) {
SBackendWrapper* pHandle = (SBackendWrapper*)arg; SBackendWrapper* pHandle = (SBackendWrapper*)arg;
RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL);
void* pIter = taosHashIterate(pHandle->cfInst, NULL);
while (pIter != NULL) { while (pIter != NULL) {
RocksdbCfInst* inst = *pIter; RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
destroyRocksdbCfInst(inst); destroyRocksdbCfInst(inst);
taosHashIterate(pHandle->cfInst, pIter); pIter = taosHashIterate(pHandle->cfInst, pIter);
} }
taosHashCleanup(pHandle->cfInst); taosHashCleanup(pHandle->cfInst);
@ -1103,7 +1104,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
for (int i = 0; i < nCf; i++) { for (int i = 0; i < nCf; i++) {
char* cf = cfs[i]; char* cf = cfs[i];
if (i == 0) continue; if (i == 0) continue; // skip default column family, not set opt
char funcname[64] = {0}; char funcname[64] = {0};
if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) { if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
char idstr[128] = {0}; char idstr[128] = {0};
@ -1125,7 +1127,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
inst->dbOpt = handle->dbOpt; inst->dbOpt = handle->dbOpt;
// rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*));
} else { } else {
inst = *pInst; inst = *pInst;
@ -1136,9 +1137,9 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
inst->pHandle[idx] = cfHandle[i]; inst->pHandle[idx] = cfHandle[i];
} }
} }
void** pIter = taosHashIterate(handle->cfInst, NULL); void* pIter = taosHashIterate(handle->cfInst, NULL);
while (pIter) { while (pIter) {
RocksdbCfInst* inst = *pIter; RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
for (int i = 0; i < cfLen; i++) { for (int i = 0; i < cfLen; i++) {
if (inst->cfOpt[i] == NULL) { if (inst->cfOpt[i] == NULL) {
@ -1179,8 +1180,8 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
taosAcquireRef(streamBackendId, pState->streamBackendRid); taosAcquireRef(streamBackendId, pState->streamBackendRid);
SBackendWrapper* handle = backend; SBackendWrapper* handle = backend;
SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper));
taosThreadMutexLock(&handle->cfMutex);
taosThreadMutexLock(&handle->cfMutex);
RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
if (ppInst != NULL && *ppInst != NULL) { if (ppInst != NULL && *ppInst != NULL) {
RocksdbCfInst* inst = *ppInst; RocksdbCfInst* inst = *ppInst;