From bb72f07a94fb89c5e96aaf9f86e0dd82d8ba6a87 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 12 Jul 2023 16:38:46 +0800 Subject: [PATCH] rm expire checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 6 +++--- source/libs/stream/src/streamMeta.c | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index bfcedd2d53..2ab55687a2 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -417,8 +417,8 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { sprintf(checkpointPath, "%s/%s", pMeta->path, "checkpoints"); if (!taosDirExist(checkpointPath)) { - return 0; // no checkpoint, nothing to load + return 0; } TdDirPtr pDir = taosOpenDir(checkpointPath); @@ -1053,7 +1053,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); inst->dbOpt = handle->dbOpt; - rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); + //rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); } else { inst = *pInst; @@ -1174,7 +1174,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL); SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare); - rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); + //rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b2dabd6356..6b3f917176 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -96,12 +96,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - pMeta->streamBackend = streamBackendInit(streamPath); - if (pMeta->streamBackend == NULL) { - goto _err; - } - - pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); @@ -109,6 +103,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->checkpointCap = 4; taosInitRWLatch(&pMeta->checkpointDirLock); + pMeta->streamBackend = streamBackendInit(streamPath); + if (pMeta->streamBackend == NULL) { + goto _err; + } + pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); + code = streamBackendLoadCheckpointInfo(pMeta); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code);