From 130f7a292d718aba1c62db4f98cb81ca338f39bf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 19 May 2024 22:36:09 +0800 Subject: [PATCH] fix(stream): reset the error code when trying to restore checkpoint. --- source/libs/stream/src/streamBackendRocksdb.c | 5 +++-- source/libs/stream/src/streamMeta.c | 10 +++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 398980dfba..5b03876f02 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -390,7 +390,7 @@ int32_t rebuildFromRemoteCheckpoint(const char* key, char* chkpPath, int64_t che } else if (type == DATA_UPLOAD_RSYNC) { return rebuildFromRemoteChkp_rsync(key, chkpPath, checkpointId, defaultPath); } else { - stError("%s not remote backup checkpoint data for:%" PRId64" restore ", key, checkpointId); + stError("%s no remote backup checkpoint data for:%" PRId64, key, checkpointId); } return -1; @@ -543,7 +543,7 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId } taosMemoryFree(checkpointRoot); - stDebug("%s check local default:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId); + stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId); char* chkptPath = taosMemoryCalloc(1, strlen(path) + 256); if (chkptId > 0) { @@ -557,6 +557,7 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId if (code != 0) { stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath, tstrerror(code), defaultPath); + code = 0; // reset the error code } } else { // no valid checkpoint id stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data", key); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 27bd001588..a6453871c8 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -791,11 +791,11 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) { } int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { - int64_t chkpId = 0; + int64_t checkpointId = 0; TBC* pCur = NULL; if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { - return chkpId; + return checkpointId; } void* pKey = NULL; @@ -816,16 +816,16 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { } tDecoderClear(&decoder); - chkpId = TMAX(chkpId, info.checkpointId); + checkpointId = TMAX(checkpointId, info.checkpointId); } - stDebug("get max chkp id: %" PRId64 "", chkpId); + stDebug("vgId:%d get max checkpointId:%" PRId64, pMeta->vgId, checkpointId); tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); - return chkpId; + return checkpointId; } // not allowed to return error code