From 7d30a6e27ad7fff52ecf0bab7432bbfb1827f3a9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 30 Jun 2023 11:48:46 +0000 Subject: [PATCH] vnode snapshot read --- include/libs/stream/tstream.h | 5 ++ source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 73 ++++++++++++++++--- source/libs/stream/src/streamMeta.c | 10 ++- 4 files changed, 76 insertions(+), 14 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 425c9449f3..4fdee619a1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -347,6 +347,11 @@ typedef struct SStreamMeta { void* streamBackend; int64_t streamBackendRid; int64_t checkpointTs; + + SArray* checkpointSaved; + SArray* checkpointInUse; + int32_t checkpointCap; + SRWLatch checkpointDirLock; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index ef4589f1a9..e60ef55902 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -46,7 +46,7 @@ typedef struct { void* streamBackendInit(const char* path); void streamBackendCleanup(void* arg); -int32_t streamBackendDoCheckpoint(int64_t rid, const char* cpPath); +int32_t streamBackendDoCheckpoint(void* pMeta, const char* path); SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index e180f23206..d20eb558ee 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -195,16 +195,69 @@ void streamBackendCleanup(void* arg) { qDebug("destroy stream backend backend:%p", pHandle); return; } -int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) { + +/* + * checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--| + * checkpointInUse: |--cp2--|--cp4--|, checkpointDir in checkpointInUse do replicate trans, cannot del until + * replication is finished + */ +int32_t delObsoleteCheckpoint(void* arg, const char* path) { + SStreamMeta* pMeta = arg; + + taosWLockLatch(&pMeta->checkpointDirLock); + int64_t checkpointId = pMeta->checkpointTs; + taosArrayPush(pMeta->checkpointSaved, &checkpointId); + + SArray* checkpointDel = taosArrayInit(10, sizeof(int64_t)); + SArray* checkpointDup = taosArrayInit(10, sizeof(int64_t)); + + int64_t minId = 0; + if (taosArrayGetSize(pMeta->checkpointInUse) >= 1) { + minId = *(int64_t*)taosArrayGet(pMeta->checkpointInUse, 0); + + for (int i = 0; i < taosArrayGetSize(pMeta->checkpointSaved); i++) { + int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); + if (id >= minId) { + taosArrayPush(checkpointDup, &id); + } else { + taosArrayPush(checkpointDel, &id); + } + } + } else { + for (int i = taosArrayGetSize(pMeta->checkpointSaved); i >= 0; i--) { + int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); + if (taosArrayGetSize(checkpointDup) < pMeta->checkpointCap) { + taosArrayPush(checkpointDup, &id); + } else { + taosArrayPush(checkpointDel, &id); + } + } + } + taosArrayDestroy(pMeta->checkpointSaved); + pMeta->checkpointSaved = checkpointDup; + + taosWUnLockLatch(&pMeta->checkpointDirLock); + + for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) { + int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i); + char tbuf[256] = {0}; + sprintf(tbuf, "%s/checkpoint_%" PRId64 "", path, id); + if (taosIsDir(tbuf)) { + taosRemoveDir(tbuf); + } + } + return 0; +} +int32_t streamBackendDoCheckpoint(void* arg, const char* path) { + SStreamMeta* pMeta = arg; + int64_t backendRid = pMeta->streamBackendRid; + int64_t checkpointId = pMeta->checkpointTs; int64_t st = taosGetTimestampMs(); int32_t code = -1; SBackendHandle* pHandle = taosAcquireRef(streamBackendId, backendRid); - static int checkpointSuffix = 0; - char newDir[256] = {0}; - char oldDir[256] = {0}; - sprintf(oldDir, "%s/checkpoint_%d", path, checkpointSuffix); - sprintf(newDir, "%s/checkpoint_%d", path, 1 - checkpointSuffix); + char checkpointDir[256] = {0}; + sprintf(checkpointDir, "%s/checkpoint_%" PRId64 "", path, checkpointId); if (pHandle == NULL) { return -1; @@ -220,7 +273,7 @@ int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) { goto _ERROR; } - rocksdb_checkpoint_create(cp, newDir, 64 << 20, &err); + rocksdb_checkpoint_create(cp, checkpointDir, 64 << 20, &err); if (err != NULL) { qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err); taosMemoryFreeClear(err); @@ -231,11 +284,7 @@ int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) { } rocksdb_checkpoint_object_destroy(cp); } - if (taosIsDir(oldDir)) { - taosRemoveDir(oldDir); - } - taosRenameFile(newDir, oldDir); - + delObsoleteCheckpoint(arg, path); _ERROR: taosReleaseRef(streamBackendId, backendRid); return code; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4727634232..8e5ffc2cf6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -93,6 +93,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); + pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); + pMeta->checkpointInUse = taosArrayInit(4, sizeof(int64_t)); + pMeta->checkpointCap = 4; + taosInitRWLatch(&pMeta->checkpointDirLock); taosMemoryFree(streamPath); @@ -108,6 +112,7 @@ _err: if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); taosMemoryFree(pMeta); + qError("failed to open stream meta"); return NULL; } @@ -138,6 +143,9 @@ void streamMetaClose(SStreamMeta* pMeta) { taosRemoveRef(streamBackendId, pMeta->streamBackendRid); pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList); taosMemoryFree(pMeta->path); + + taosArrayDestroy(pMeta->checkpointSaved); + taosArrayDestroy(pMeta->checkpointInUse); taosMemoryFree(pMeta); } @@ -419,6 +427,6 @@ int32_t streamDoCheckpoint(SStreamMeta* pMeta) { qError("failed to create chechpoint %s, reason:%s", buf, tstrerror(code)); return code; } - code = streamBackendDoCheckpoint(pMeta->streamBackendRid, buf); + code = streamBackendDoCheckpoint((void*)pMeta, buf); return code; }