From aad669e33c13ccb87b47e1dcdf442d8e09b4e5e9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Jul 2023 14:46:13 +0000 Subject: [PATCH] add checkpoint --- source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 40 ++++++++++++++++--- source/libs/stream/src/streamMeta.c | 8 +++- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 17a28b8b82..9431327f56 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -44,7 +44,7 @@ typedef struct { int64_t defaultCfInit; } SBackendWrapper; -void* streamBackendInit(const char* path); +void* streamBackendInit(const char* path, int64_t chkpId); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); int32_t streamBackendLoadCheckpointInfo(void* pMeta); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index cfccf1d286..24b675884c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -144,10 +144,40 @@ SCfInit ginitDict[] = { encodeValueFunc, decodeValueFunc}, }; -void* streamBackendInit(const char* path) { +int32_t streamBackendRebuildDirFromCheckpoint(const char* stateDir, const char* chkpDir) { + // impl later + + return 0; +} +void* streamBackendInit(const char* path, int64_t chkpId) { + int32_t code = 0; + char* state = taosMemoryCalloc(1, strlen(path) + 32); + + sprintf(state, "%s/%s", path, "state"); + if (chkpId != 0) { + char* chkp = taosMemoryCalloc(1, strlen(path) + 64); + sprintf(chkp, "%s/%s/checkpoint-%" PRId64 "", path, "checkpoints", chkpId); + if (taosIsDir(chkp)) { + if (taosIsDir(state)) { + // remove dir if exists + // taosRenameFile(const char *oldName, const char *newName) + taosRemoveDir(state); + } + taosMkDir(state); + code = streamBackendRebuildDirFromCheckpoint(state, chkp); + if (code != 0) { + qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); + } + + } else { + qError("failed to start stream backend at %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); + taosMkDir(state); + } + } + uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; - qDebug("start to init stream backend at %s", path); + qDebug("start to init stream backend at %s", state); SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); pHandle->list = tdListNew(sizeof(SCfComparator)); taosThreadMutexInit(&pHandle->mutex, NULL); @@ -183,7 +213,7 @@ void* streamBackendInit(const char* path) { char* err = NULL; size_t nCf = 0; - char** cfs = rocksdb_list_column_families(opts, path, &nCf, &err); + char** cfs = rocksdb_list_column_families(opts, state, &nCf, &err); if (nCf == 0 || nCf == 1 || err != NULL) { taosMemoryFreeClear(err); pHandle->db = rocksdb_open(opts, path, &err); @@ -196,12 +226,12 @@ void* streamBackendInit(const char* path) { /* list all cf and get prefix */ - streamStateOpenBackendCf(pHandle, (char*)path, cfs, nCf); + streamStateOpenBackendCf(pHandle, (char*)state, cfs, nCf); } if (cfs != NULL) { rocksdb_list_column_families_destroy(cfs, nCf); } - qDebug("succ to init stream backend at %s, backend:%p", path, pHandle); + qDebug("succ to init stream backend at %s, backend:%p", state, pHandle); return (void*)pHandle; _EXIT: diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index bd8227a962..39bf1fd55e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -23,6 +23,8 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; +int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); + static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); @@ -103,7 +105,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->checkpointCap = 4; taosInitRWLatch(&pMeta->checkpointDirLock); - pMeta->streamBackend = streamBackendInit(streamPath); + int64_t chkpId = streamGetLatestCheckpointId(pMeta); + + pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId); if (pMeta->streamBackend == NULL) { goto _err; } @@ -443,7 +447,7 @@ _err: return chkpId; } int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { - int64_t checkpointId = streamGetLatestCheckpointId(pMeta); + int64_t checkpointId = 0; TBC* pCur = NULL; if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {