fix(stream): reset the error code when trying to restore checkpoint.

This commit is contained in:
Haojun Liao 2024-05-19 22:36:09 +08:00
parent 2b44211928
commit 130f7a292d
2 changed files with 8 additions and 7 deletions

View File

@ -390,7 +390,7 @@ int32_t rebuildFromRemoteCheckpoint(const char* key, char* chkpPath, int64_t che
} else if (type == DATA_UPLOAD_RSYNC) { } else if (type == DATA_UPLOAD_RSYNC) {
return rebuildFromRemoteChkp_rsync(key, chkpPath, checkpointId, defaultPath); return rebuildFromRemoteChkp_rsync(key, chkpPath, checkpointId, defaultPath);
} else { } else {
stError("%s not remote backup checkpoint data for:%" PRId64" restore ", key, checkpointId); stError("%s no remote backup checkpoint data for:%" PRId64, key, checkpointId);
} }
return -1; return -1;
@ -543,7 +543,7 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
} }
taosMemoryFree(checkpointRoot); taosMemoryFree(checkpointRoot);
stDebug("%s check local default:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId); stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId);
char* chkptPath = taosMemoryCalloc(1, strlen(path) + 256); char* chkptPath = taosMemoryCalloc(1, strlen(path) + 256);
if (chkptId > 0) { if (chkptId > 0) {
@ -557,6 +557,7 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
if (code != 0) { if (code != 0) {
stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath, stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath,
tstrerror(code), defaultPath); tstrerror(code), defaultPath);
code = 0; // reset the error code
} }
} else { // no valid checkpoint id } else { // no valid checkpoint id
stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data", key); stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data", key);

View File

@ -791,11 +791,11 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) {
} }
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
int64_t chkpId = 0; int64_t checkpointId = 0;
TBC* pCur = NULL; TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
return chkpId; return checkpointId;
} }
void* pKey = NULL; void* pKey = NULL;
@ -816,16 +816,16 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
chkpId = TMAX(chkpId, info.checkpointId); checkpointId = TMAX(checkpointId, info.checkpointId);
} }
stDebug("get max chkp id: %" PRId64 "", chkpId); stDebug("vgId:%d get max checkpointId:%" PRId64, pMeta->vgId, checkpointId);
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
tdbTbcClose(pCur); tdbTbcClose(pCur);
return chkpId; return checkpointId;
} }
// not allowed to return error code // not allowed to return error code