From ef247cdb1d1d0559b6d328482b4256c79f3e08db Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 10 Aug 2023 01:49:26 +0000 Subject: [PATCH] support reopen stream state --- include/libs/stream/tstream.h | 10 +-- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 4 +- source/libs/stream/src/streamBackendRocksdb.c | 84 +++++++++---------- source/libs/stream/src/streamMeta.c | 18 ++-- 4 files changed, 58 insertions(+), 58 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4c0005ece7..16c47024ef 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -369,11 +369,11 @@ typedef struct SStreamMeta { int32_t chkptNotReadyTasks; - int64_t checkpointId; - SArray* checkpointSaved; - SArray* checkpointInUse; - int32_t checkpointCap; - SRWLatch checkpointDirLock; + int64_t chkpId; + SArray* chkpSaved; + SArray* chkpInUse; + int32_t chkpCap; + SRWLatch chkpDirLock; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 87d174715d..5f77ea50e6 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -47,11 +47,11 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pReader->sver = sver; pReader->ever = ever; - int64_t checkpointId = meta ? meta->checkpointId : 0; + int64_t chkpId = meta ? meta->chkpId : 0; SStreamSnapReader* pSnapReader = NULL; - if (streamSnapReaderOpen(pTq, sver, checkpointId, pTq->path, &pSnapReader) == 0) { + if (streamSnapReaderOpen(pTq, sver, chkpId, pTq->path, &pSnapReader) == 0) { pReader->complete = 1; } else { code = -1; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 85abe203d4..14c94a7996 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -372,8 +372,8 @@ int32_t copyFiles(const char* src, const char* dst) { // opt later, just hard link int32_t sLen = strlen(src); int32_t dLen = strlen(dst); - char* absSrcPath = taosMemoryCalloc(1, sLen + 64); - char* absDstPath = taosMemoryCalloc(1, dLen + 64); + char* srcName = taosMemoryCalloc(1, sLen + 64); + char* dstName = taosMemoryCalloc(1, dLen + 64); TdDirPtr pDir = taosOpenDir(src); if (pDir == NULL) return 0; @@ -383,22 +383,22 @@ int32_t copyFiles(const char* src, const char* dst) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; - sprintf(absSrcPath, "%s%s%s", src, TD_DIRSEP, name); - sprintf(absDstPath, "%s%s%s", dst, TD_DIRSEP, name); + sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); + sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); if (!taosDirEntryIsDir(de)) { - code = taosCopyFile(absSrcPath, absDstPath); + code = taosCopyFile(srcName, dstName); if (code == -1) { goto _err; } } - memset(absSrcPath, 0, sLen + 64); - memset(absDstPath, 0, dLen + 64); + memset(srcName, 0, sLen + 64); + memset(dstName, 0, dLen + 64); } _err: - taosMemoryFreeClear(absSrcPath); - taosMemoryFreeClear(absDstPath); + taosMemoryFreeClear(srcName); + taosMemoryFreeClear(dstName); taosCloseDir(&pDir); return code >= 0 ? 0 : -1; } @@ -626,75 +626,75 @@ void streamBackendHandleCleanup(void* arg) { int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) { SStreamMeta* pMeta = arg; - taosWLockLatch(&pMeta->checkpointDirLock); + taosWLockLatch(&pMeta->chkpDirLock); int64_t tc = 0; - int32_t sz = taosArrayGetSize(pMeta->checkpointSaved); + int32_t sz = taosArrayGetSize(pMeta->chkpSaved); if (sz <= 0) { - taosWUnLockLatch(&pMeta->checkpointDirLock); + taosWUnLockLatch(&pMeta->chkpDirLock); return -1; } else { - tc = *(int64_t*)taosArrayGetLast(pMeta->checkpointSaved); + tc = *(int64_t*)taosArrayGetLast(pMeta->chkpSaved); } - taosArrayPush(pMeta->checkpointInUse, &tc); + taosArrayPush(pMeta->chkpInUse, &tc); *checkpoint = tc; - taosWUnLockLatch(&pMeta->checkpointDirLock); + taosWUnLockLatch(&pMeta->chkpDirLock); return 0; } /* * checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--| - * checkpointInUse: |--cp2--|--cp4--| - * checkpointInUse is doing translation, cannot del until + * chkpInUse: |--cp2--|--cp4--| + * chkpInUse is doing translation, cannot del until * replication is finished */ int32_t delObsoleteCheckpoint(void* arg, const char* path) { SStreamMeta* pMeta = arg; - taosWLockLatch(&pMeta->checkpointDirLock); + taosWLockLatch(&pMeta->chkpDirLock); - SArray* checkpointDel = taosArrayInit(10, sizeof(int64_t)); - SArray* checkpointDup = taosArrayInit(10, sizeof(int64_t)); + SArray* chkpDel = taosArrayInit(10, sizeof(int64_t)); + SArray* chkpDup = taosArrayInit(10, sizeof(int64_t)); int64_t minId = 0; - if (taosArrayGetSize(pMeta->checkpointInUse) >= 1) { - minId = *(int64_t*)taosArrayGet(pMeta->checkpointInUse, 0); + if (taosArrayGetSize(pMeta->chkpInUse) >= 1) { + minId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0); - for (int i = 0; i < taosArrayGetSize(pMeta->checkpointSaved); i++) { - int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); + for (int i = 0; i < taosArrayGetSize(pMeta->chkpSaved); i++) { + int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i); if (id >= minId) { - taosArrayPush(checkpointDup, &id); + taosArrayPush(chkpDup, &id); } else { - taosArrayPush(checkpointDel, &id); + taosArrayPush(chkpDel, &id); } } } else { - int32_t sz = taosArrayGetSize(pMeta->checkpointSaved); - int32_t dsz = sz - pMeta->checkpointCap; // del size + int32_t sz = taosArrayGetSize(pMeta->chkpSaved); + int32_t dsz = sz - pMeta->chkpCap; // del size for (int i = 0; i < dsz; i++) { - int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); - taosArrayPush(checkpointDel, &id); + int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i); + taosArrayPush(chkpDel, &id); } for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) { - int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); - taosArrayPush(checkpointDup, &id); + int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i); + taosArrayPush(chkpDup, &id); } } - taosArrayDestroy(pMeta->checkpointSaved); - pMeta->checkpointSaved = checkpointDup; + taosArrayDestroy(pMeta->chkpSaved); + pMeta->chkpSaved = chkpDup; - taosWUnLockLatch(&pMeta->checkpointDirLock); + taosWUnLockLatch(&pMeta->chkpDirLock); - for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) { - int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i); + for (int i = 0; i < taosArrayGetSize(chkpDel); i++) { + int64_t id = *(int64_t*)taosArrayGet(chkpDel, i); char tbuf[256] = {0}; sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id); if (taosIsDir(tbuf)) { taosRemoveDir(tbuf); } } - taosArrayDestroy(checkpointDel); + taosArrayDestroy(chkpDel); return 0; } @@ -742,7 +742,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { for (int i = 0; i < taosArrayGetSize(suffix); i++) { int64_t id = *(int64_t*)taosArrayGet(suffix, i); - taosArrayPush(pMeta->checkpointSaved, &id); + taosArrayPush(pMeta->chkpSaved, &id); } taosArrayDestroy(suffix); @@ -794,9 +794,9 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } rocksdb_checkpoint_object_destroy(cp); } - taosWLockLatch(&pMeta->checkpointDirLock); - taosArrayPush(pMeta->checkpointSaved, &checkpointId); - taosWUnLockLatch(&pMeta->checkpointDirLock); + taosWLockLatch(&pMeta->chkpDirLock); + taosArrayPush(pMeta->chkpSaved, &checkpointId); + taosWUnLockLatch(&pMeta->chkpDirLock); delObsoleteCheckpoint(arg, path); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 907ff2b48a..20a62963cc 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -91,12 +91,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); - pMeta->checkpointInUse = taosArrayInit(4, sizeof(int64_t)); - pMeta->checkpointCap = 8; - taosInitRWLatch(&pMeta->checkpointDirLock); + pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t)); + pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t)); + pMeta->chkpCap = 8; + taosInitRWLatch(&pMeta->chkpDirLock); int64_t chkpId = streamGetLatestCheckpointId(pMeta); + pMeta->chkpId = chkpId; pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId); if (pMeta->streamBackend == NULL) { @@ -109,7 +110,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF terrno = TAOS_SYSTEM_ERROR(code); goto _err; } - taosInitRWLatch(&pMeta->lock); taosThreadMutexInit(&pMeta->backendMutex, NULL); @@ -182,9 +182,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { taosHashClear(pMeta->pTaskBackendUnique); - taosArrayClear(pMeta->checkpointSaved); + taosArrayClear(pMeta->chkpSaved); - taosArrayClear(pMeta->checkpointInUse); + taosArrayClear(pMeta->chkpInUse); return 0; } @@ -222,8 +222,8 @@ void streamMetaClose(SStreamMeta* pMeta) { taosThreadMutexDestroy(&pMeta->backendMutex); taosHashCleanup(pMeta->pTaskBackendUnique); - taosArrayDestroy(pMeta->checkpointSaved); - taosArrayDestroy(pMeta->checkpointInUse); + taosArrayDestroy(pMeta->chkpSaved); + taosArrayDestroy(pMeta->chkpInUse); taosMemoryFree(pMeta); }