From aad669e33c13ccb87b47e1dcdf442d8e09b4e5e9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Jul 2023 14:46:13 +0000 Subject: [PATCH 1/3] add checkpoint --- source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 40 ++++++++++++++++--- source/libs/stream/src/streamMeta.c | 8 +++- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 17a28b8b82..9431327f56 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -44,7 +44,7 @@ typedef struct { int64_t defaultCfInit; } SBackendWrapper; -void* streamBackendInit(const char* path); +void* streamBackendInit(const char* path, int64_t chkpId); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); int32_t streamBackendLoadCheckpointInfo(void* pMeta); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index cfccf1d286..24b675884c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -144,10 +144,40 @@ SCfInit ginitDict[] = { encodeValueFunc, decodeValueFunc}, }; -void* streamBackendInit(const char* path) { +int32_t streamBackendRebuildDirFromCheckpoint(const char* stateDir, const char* chkpDir) { + // impl later + + return 0; +} +void* streamBackendInit(const char* path, int64_t chkpId) { + int32_t code = 0; + char* state = taosMemoryCalloc(1, strlen(path) + 32); + + sprintf(state, "%s/%s", path, "state"); + if (chkpId != 0) { + char* chkp = taosMemoryCalloc(1, strlen(path) + 64); + sprintf(chkp, "%s/%s/checkpoint-%" PRId64 "", path, "checkpoints", chkpId); + if (taosIsDir(chkp)) { + if (taosIsDir(state)) { + // remove dir if exists + // taosRenameFile(const char *oldName, const char *newName) + taosRemoveDir(state); + } + taosMkDir(state); + code = streamBackendRebuildDirFromCheckpoint(state, chkp); + if (code != 0) { + qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); + } + + } else { + qError("failed to start stream backend at %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); + taosMkDir(state); + } + } + uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; - qDebug("start to init stream backend at %s", path); + qDebug("start to init stream backend at %s", state); SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); pHandle->list = tdListNew(sizeof(SCfComparator)); taosThreadMutexInit(&pHandle->mutex, NULL); @@ -183,7 +213,7 @@ void* streamBackendInit(const char* path) { char* err = NULL; size_t nCf = 0; - char** cfs = rocksdb_list_column_families(opts, path, &nCf, &err); + char** cfs = rocksdb_list_column_families(opts, state, &nCf, &err); if (nCf == 0 || nCf == 1 || err != NULL) { taosMemoryFreeClear(err); pHandle->db = rocksdb_open(opts, path, &err); @@ -196,12 +226,12 @@ void* streamBackendInit(const char* path) { /* list all cf and get prefix */ - streamStateOpenBackendCf(pHandle, (char*)path, cfs, nCf); + streamStateOpenBackendCf(pHandle, (char*)state, cfs, nCf); } if (cfs != NULL) { rocksdb_list_column_families_destroy(cfs, nCf); } - qDebug("succ to init stream backend at %s, backend:%p", path, pHandle); + qDebug("succ to init stream backend at %s, backend:%p", state, pHandle); return (void*)pHandle; _EXIT: diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index bd8227a962..39bf1fd55e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -23,6 +23,8 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; +int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); + static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); @@ -103,7 +105,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->checkpointCap = 4; taosInitRWLatch(&pMeta->checkpointDirLock); - pMeta->streamBackend = streamBackendInit(streamPath); + int64_t chkpId = streamGetLatestCheckpointId(pMeta); + + pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId); if (pMeta->streamBackend == NULL) { goto _err; } @@ -443,7 +447,7 @@ _err: return chkpId; } int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { - int64_t checkpointId = streamGetLatestCheckpointId(pMeta); + int64_t checkpointId = 0; TBC* pCur = NULL; if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { From e18c9ea672a14597cd003447c85415febe84e648 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 15 Jul 2023 04:26:34 +0000 Subject: [PATCH 2/3] add checkpoint --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 55 +++++++++++++++++-- source/libs/stream/src/streamMeta.c | 6 +- 5 files changed, 54 insertions(+), 13 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e586b1a028..50642f9c7d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -103,7 +103,7 @@ typedef struct { } SStreamQueueItem; typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); -typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver, int64_t checkpointId); +typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); typedef struct { int8_t type; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 5d99929c69..584b238d1b 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -168,7 +168,7 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver, int64_t checkpointId); +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqStreamTasksScanWal(STQ* pTq); int32_t tqStreamTasksStatusCheck(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 848468d3e0..5c2d23db20 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -743,7 +743,7 @@ end: void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver, int64_t checkpointId) { +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 24b675884c..bf05437117 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -144,12 +144,46 @@ SCfInit ginitDict[] = { encodeValueFunc, decodeValueFunc}, }; -int32_t streamBackendRebuildDirFromCheckpoint(const char* stateDir, const char* chkpDir) { - // impl later +bool isValidCheckpoint(const char* dir) { return true; } - return 0; +int32_t copyFiles(const char* src, const char* dst) { + int32_t code = 0; + // opt later, just hard link + int32_t sLen = strlen(src); + int32_t dLen = strlen(dst); + char* absSrcPath = taosMemoryCalloc(1, sLen + 64); + char* absDstPath = taosMemoryCalloc(1, dLen + 64); + + TdDirPtr pDir = taosOpenDir(src); + if (pDir == NULL) return 0; + + TdDirEntryPtr de = NULL; + + while ((de = taosReadDir(pDir)) != NULL) { + char* name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + + sprintf(absSrcPath, "%s/%s", src, name); + sprintf(absDstPath, "%s/%s", dst, name); + if (!taosDirEntryIsDir(de)) { + code = taosCopyFile(absSrcPath, absDstPath); + if (code == -1) { + goto _err; + } + } + + memset(absSrcPath, 0, sLen + 64); + memset(absDstPath, 0, dLen + 64); + } + +_err: + taosMemoryFreeClear(absSrcPath); + taosMemoryFreeClear(absDstPath); + taosCloseDir(&pDir); + return code; } -void* streamBackendInit(const char* path, int64_t chkpId) { +int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { + // impl later int32_t code = 0; char* state = taosMemoryCalloc(1, strlen(path) + 32); @@ -157,14 +191,14 @@ void* streamBackendInit(const char* path, int64_t chkpId) { if (chkpId != 0) { char* chkp = taosMemoryCalloc(1, strlen(path) + 64); sprintf(chkp, "%s/%s/checkpoint-%" PRId64 "", path, "checkpoints", chkpId); - if (taosIsDir(chkp)) { + if (taosIsDir(chkp) && isValidCheckpoint(chkp)) { if (taosIsDir(state)) { // remove dir if exists // taosRenameFile(const char *oldName, const char *newName) taosRemoveDir(state); } taosMkDir(state); - code = streamBackendRebuildDirFromCheckpoint(state, chkp); + code = copyFiles(chkp, state); if (code != 0) { qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); } @@ -174,9 +208,16 @@ void* streamBackendInit(const char* path, int64_t chkpId) { taosMkDir(state); } } + *dst = state; + return 0; +} +void* streamBackendInit(const char* path, int64_t chkpId) { uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; + char* state = NULL; + int32_t code = rebuildDirFromCheckpoint(path, chkpId, &state); + qDebug("start to init stream backend at %s", state); SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); pHandle->list = tdListNew(sizeof(SCfComparator)); @@ -232,6 +273,7 @@ void* streamBackendInit(const char* path, int64_t chkpId) { rocksdb_list_column_families_destroy(cfs, nCf); } qDebug("succ to init stream backend at %s, backend:%p", state, pHandle); + taosMemoryFreeClear(state); return (void*)pHandle; _EXIT: @@ -243,6 +285,7 @@ _EXIT: taosHashCleanup(pHandle->cfInst); rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); tdListFree(pHandle->list); + taosMemoryFree(state); taosMemoryFree(pHandle); qDebug("failed to init stream backend at %s", path); return NULL; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 39bf1fd55e..2976a65d1d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -249,7 +249,7 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { - if (pMeta->expandFunc(pMeta->ahandle, pTask, ver, checkpointId) < 0) { + if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { tFreeStreamTask(pTask); return -1; } @@ -447,8 +447,6 @@ _err: return chkpId; } int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { - int64_t checkpointId = 0; - TBC* pCur = NULL; if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { return -1; @@ -477,7 +475,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { // remove duplicate void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { - if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer, checkpointId) < 0) { + if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); From b4de0892e674f07558069f6ae11967cd68b445f3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 15 Jul 2023 09:35:33 +0000 Subject: [PATCH 3/3] add checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 26 +++++++++---------- source/libs/stream/src/streamMeta.c | 16 ++++++------ 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index bf05437117..9ba1db5b9e 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -212,13 +212,13 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { return 0; } -void* streamBackendInit(const char* path, int64_t chkpId) { - uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; +void* streamBackendInit(const char* streamPath, int64_t chkpId) { + char* backendPath = NULL; + int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); - char* state = NULL; - int32_t code = rebuildDirFromCheckpoint(path, chkpId, &state); + qDebug("start to init stream backend at %s", backendPath); - qDebug("start to init stream backend at %s", state); + uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); pHandle->list = tdListNew(sizeof(SCfComparator)); taosThreadMutexInit(&pHandle->mutex, NULL); @@ -254,12 +254,12 @@ void* streamBackendInit(const char* path, int64_t chkpId) { char* err = NULL; size_t nCf = 0; - char** cfs = rocksdb_list_column_families(opts, state, &nCf, &err); + char** cfs = rocksdb_list_column_families(opts, backendPath, &nCf, &err); if (nCf == 0 || nCf == 1 || err != NULL) { taosMemoryFreeClear(err); - pHandle->db = rocksdb_open(opts, path, &err); + pHandle->db = rocksdb_open(opts, backendPath, &err); if (err != NULL) { - qError("failed to open rocksdb, path:%s, reason:%s", path, err); + qError("failed to open rocksdb, path:%s, reason:%s", backendPath, err); taosMemoryFreeClear(err); goto _EXIT; } @@ -267,13 +267,13 @@ void* streamBackendInit(const char* path, int64_t chkpId) { /* list all cf and get prefix */ - streamStateOpenBackendCf(pHandle, (char*)state, cfs, nCf); + streamStateOpenBackendCf(pHandle, (char*)backendPath, cfs, nCf); } if (cfs != NULL) { rocksdb_list_column_families_destroy(cfs, nCf); } - qDebug("succ to init stream backend at %s, backend:%p", state, pHandle); - taosMemoryFreeClear(state); + qDebug("succ to init stream backend at %s, backend:%p", backendPath, pHandle); + taosMemoryFreeClear(backendPath); return (void*)pHandle; _EXIT: @@ -285,9 +285,9 @@ _EXIT: taosHashCleanup(pHandle->cfInst); rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); tdListFree(pHandle->list); - taosMemoryFree(state); + taosMemoryFree(backendPath); taosMemoryFree(pHandle); - qDebug("failed to init stream backend at %s", path); + qDebug("failed to init stream backend at %s", backendPath); return NULL; } void streamBackendCleanup(void* arg) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2976a65d1d..d31c4337fa 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -51,8 +51,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { goto _err; } - memset(streamPath, 0, len); + memset(streamPath, 0, len); sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); code = taosMulModeMkDir(streamPath, 0755); if (code != 0) { @@ -90,13 +90,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - memset(streamPath, 0, len); - sprintf(streamPath, "%s/%s", pMeta->path, "state"); - code = taosMulModeMkDir(streamPath, 0755); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - goto _err; - } + // memset(streamPath, 0, len); + // sprintf(streamPath, "%s/%s", pMeta->path, "state"); + // code = taosMulModeMkDir(streamPath, 0755); + // if (code != 0) { + // terrno = TAOS_SYSTEM_ERROR(code); + // goto _err; + // } pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);