diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 53b45f13a2..cbe6dcc886 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -373,24 +373,24 @@ int32_t createDirIfNotExist(const char* pPath) { } } -int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) { +int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) { int32_t code = 0; - if (taosIsDir(chkptPath)) { - taosRemoveDir(chkptPath); - stDebug("remove local checkpoint data dir:%s succ", chkptPath); + if (taosIsDir(checkpointPath)) { + taosRemoveDir(checkpointPath); + stDebug("remove local checkpoint data dir:%s succ", checkpointPath); } cleanDir(defaultPath, key); stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath); - code = streamTaskDownloadCheckpointData(key, chkptPath); + code = streamTaskDownloadCheckpointData(key, checkpointPath); if (code != 0) { stError("failed to download checkpoint data:%s", key); return code; } stDebug("download remote checkpoint data for checkpointId:%" PRId64 ", %s", checkpointId, key); - return backendCopyFiles(chkptPath, defaultPath); + return backendCopyFiles(checkpointPath, defaultPath); } int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { @@ -399,29 +399,45 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId return code; } - int32_t len = strlen(defaultPath) + 32; - char* tmp = taosMemoryCalloc(1, len); - sprintf(tmp, "%s%s", defaultPath, "_tmp"); + int32_t nBytes; + int32_t cap = strlen(defaultPath) + 32; + + char* tmp = taosMemoryCalloc(1, cap); + if (tmp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + nBytes = snprintf(tmp, cap, "%s%s", defaultPath, "_tmp"); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + taosMemoryFree(tmp); + return -1; + } + if (taosIsDir(tmp)) taosRemoveDir(tmp); if (taosIsDir(defaultPath)) taosRenameFile(defaultPath, tmp); // SArray* list = taosArrayInit(2, sizeof(void*)); SSChkpMetaOnS3* pMeta; code = remoteChkp_readMetaData(chkpPath, &pMeta); - if (code == 0) { - code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId); - } + if (code == 0) code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId); + taosMemoryFree(pMeta); - // taosArrayDestroyP(list, taosMemoryFree); if (code == 0) { - taosMkDir(defaultPath); + code = taosMkDir(defaultPath); + } + + if (code == 0) { code = backendCopyFiles(chkpPath, defaultPath); } if (code != 0) { if (taosIsDir(defaultPath)) taosRemoveDir(defaultPath); - if (taosIsDir(tmp)) taosRenameFile(tmp, defaultPath); + if (taosIsDir(tmp)) { + code = taosRenameFile(tmp, defaultPath); + } } else { taosRemoveDir(tmp); } @@ -430,12 +446,12 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId return code; } -int32_t rebuildFromRemoteCheckpoint(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) { +int32_t rebuildFromRemoteCheckpoint(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_S3) { - return rebuildFromRemoteChkp_s3(key, chkptPath, checkpointId, defaultPath); + return rebuildFromRemoteChkp_s3(key, checkpointPath, checkpointId, defaultPath); } else if (type == DATA_UPLOAD_RSYNC) { - return rebuildFromRemoteChkp_rsync(key, chkptPath, checkpointId, defaultPath); + return rebuildFromRemoteChkp_rsync(key, checkpointPath, checkpointId, defaultPath); } else { stError("%s no remote backup checkpoint data for:%" PRId64, key, checkpointId); } @@ -570,69 +586,78 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId int64_t* processVer) { int32_t code = -1; - size_t pathLen = strlen(path); - char* prefixPath = NULL; - char* defaultPath = NULL; + char* prefixPath = NULL; + char* defaultPath = NULL; + char* checkpointPath = NULL; + char* checkpointRoot = NULL; + + int32_t cap = strlen(path) + 128; + int32_t nBytes; // alloc buf - prefixPath = taosMemoryCalloc(1, pathLen + 64); - if (prefixPath == NULL) { + prefixPath = taosMemoryCalloc(1, cap); + defaultPath = taosMemoryCalloc(1, cap); + checkpointPath = taosMemoryCalloc(1, cap); + checkpointRoot = taosMemoryCalloc(1, cap); + if (prefixPath == NULL || defaultPath == NULL || checkpointPath == NULL || checkpointRoot == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } - sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key); + nBytes = snprintf(prefixPath, cap, "%s%s%s", path, TD_DIRSEP, key); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } + code = createDirIfNotExist(prefixPath); if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); goto _EXIT; } - defaultPath = taosMemoryCalloc(1, pathLen + 128); - if (defaultPath == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + nBytes = snprintf(defaultPath, cap, "%s%s%s", prefixPath, TD_DIRSEP, "state"); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; goto _EXIT; } - sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "state"); code = createDirIfNotExist(defaultPath); if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); goto _EXIT; } - char* checkpointRoot = taosMemoryCalloc(1, pathLen + 48); - if (checkpointRoot == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + nBytes = snprintf(checkpointRoot, cap, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints"); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; goto _EXIT; } - sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints"); code = createDirIfNotExist(checkpointRoot); if (code != 0) { - taosMemoryFreeClear(checkpointRoot); + terrno = TAOS_SYSTEM_ERROR(errno); goto _EXIT; } - taosMemoryFreeClear(checkpointRoot); stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId); - - char* chkptPath = taosMemoryCalloc(1, pathLen + 128); - if (chkptPath == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _EXIT; - } - if (chkptId > 0) { - snprintf(chkptPath, pathLen + 127, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, - "checkpoint", chkptId); + nBytes = snprintf(checkpointPath, cap, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, + "checkpoint", chkptId); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } - code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath, processVer); + code = rebuildFromLocalCheckpoint(key, checkpointPath, chkptId, defaultPath, processVer); if (code != 0) { - code = rebuildFromRemoteCheckpoint(key, chkptPath, chkptId, defaultPath); + terrno = 0; + code = rebuildFromRemoteCheckpoint(key, checkpointPath, chkptId, defaultPath); } if (code != 0) { - stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath, - tstrerror(code), defaultPath); + stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s, reason:%s", + checkpointPath, tstrerror(code), defaultPath, tstrerror(terrno)); code = 0; // reset the error code } } else { // no valid checkpoint id @@ -641,15 +666,18 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId cleanDir(defaultPath, key); } - taosMemoryFree(chkptPath); - *dbPath = defaultPath; *dbPrefixPath = prefixPath; - return 0; + defaultPath = NULL; + prefixPath = NULL; + + code = 0; _EXIT: taosMemoryFree(defaultPath); taosMemoryFree(prefixPath); + taosMemoryFree(checkpointPath); + taosMemoryFree(checkpointRoot); return code; } @@ -4334,7 +4362,14 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { int32_t sstLen = strlen(pSST); memset(p->buf, 0, p->len); - sprintf(p->buf, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); + + nBytes = + snprintf(p->buf, p->len, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); + if (nBytes <= 0 || nBytes >= p->len) { + terrno = TSDB_CODE_OUT_OF_RANGE; + taosThreadRwlockUnlock(&p->rwLock); + return -1; + } taosArrayClearP(p->pAdd, taosMemoryFree); taosArrayClearP(p->pDel, taosMemoryFree); @@ -4518,10 +4553,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { char* srcBuf = taosMemoryCalloc(1, cap); char* dstBuf = taosMemoryCalloc(1, cap); - char* srcDir = taosMemoryCalloc(1, cap); char* dstDir = taosMemoryCalloc(1, cap); - if (srcBuf == NULL || dstBuf == NULL || srcDir == NULL || dstDir == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _ERROR;