From dea2e73d2e614c1643b8dea0c2fe7178086b128d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 17 Jul 2023 02:22:21 +0000 Subject: [PATCH] add checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 15 ++++++++------- source/libs/stream/src/streamMeta.c | 2 +- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 9ba1db5b9e..55b53552d4 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -180,17 +180,16 @@ _err: taosMemoryFreeClear(absSrcPath); taosMemoryFreeClear(absDstPath); taosCloseDir(&pDir); - return code; + return code >= 0 ? 0 : -1; } int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { // impl later int32_t code = 0; char* state = taosMemoryCalloc(1, strlen(path) + 32); - sprintf(state, "%s/%s", path, "state"); if (chkpId != 0) { char* chkp = taosMemoryCalloc(1, strlen(path) + 64); - sprintf(chkp, "%s/%s/checkpoint-%" PRId64 "", path, "checkpoints", chkpId); + sprintf(chkp, "%s/%s/checkpoint%" PRId64 "", path, "checkpoints", chkpId); if (taosIsDir(chkp) && isValidCheckpoint(chkp)) { if (taosIsDir(state)) { // remove dir if exists @@ -201,6 +200,8 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { code = copyFiles(chkp, state); if (code != 0) { qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + qInfo("succ to restart stream backend at checkpoint path: %s", chkp); } } else { @@ -458,7 +459,7 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) { int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i); char tbuf[256] = {0}; - sprintf(tbuf, "%s/checkpoint-%" PRId64 "", path, id); + sprintf(tbuf, "%s/checkpoint%" PRId64 "", path, id); if (taosIsDir(tbuf)) { taosRemoveDir(tbuf); } @@ -499,7 +500,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { char checkpointPrefix[32] = {0}; int64_t checkpointId = 0; - int ret = sscanf(taosGetDirEntryName(de), "checkpoint-%" PRId64 "", &checkpointId); + int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId); if (ret == 1) { taosArrayPush(suffix, &checkpointId); } @@ -534,7 +535,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } char checkpointDir[256] = {0}; - snprintf(checkpointDir, tListLen(checkpointDir), "%s/checkpoint-%" PRId64, path, checkpointId); + snprintf(checkpointDir, tListLen(checkpointDir), "%s/checkpoint%" PRId64, path, checkpointId); SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); if (pHandle == NULL) { @@ -1630,10 +1631,10 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; + pCur->number = pState->number; pCur->db = ((SBackendCfWrapper*)pState->pTdbState->pBackendCfWrapper)->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); - pCur->number = pState->number; rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_prev(pCur->iter); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d31c4337fa..d35c980024 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -102,7 +102,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); pMeta->checkpointInUse = taosArrayInit(4, sizeof(int64_t)); - pMeta->checkpointCap = 4; + pMeta->checkpointCap = 8; taosInitRWLatch(&pMeta->checkpointDirLock); int64_t chkpId = streamGetLatestCheckpointId(pMeta);