From 8428a5be374aef352de78f9101ad4af5a3c809f8 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 4 Jul 2024 10:25:37 +0000 Subject: [PATCH] refactor backend --- include/util/taoserror.h | 1 + source/dnode/vnode/src/tq/tqStreamStateSnap.c | 11 +- source/libs/stream/src/streamBackendRocksdb.c | 429 +++++++++--------- source/libs/stream/src/streamCheckpoint.c | 58 ++- source/libs/stream/src/streamSnapshot.c | 92 ++-- source/util/src/terror.c | 1 + 6 files changed, 295 insertions(+), 297 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2de336d036..359872e8cd 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -136,6 +136,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) #define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D) #define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E) +#define TSDB_CODE_THIRDPARTY_ERROR TAOS_DEF_ERROR_CODE(0, 0x012F) #define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) #define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index be768e375e..07bfd52a9c 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -132,8 +132,7 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS // alloc pWriter = (SStreamStateWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = terrno; + code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } pWriter->pTq = pTq; @@ -141,14 +140,14 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pWriter->ever = ever; if (taosMkDir(pTq->pStreamMeta->path) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); tqError("vgId:%d, vnode %s snapshot writer failed to create directory %s since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, pTq->pStreamMeta->path, tstrerror(terrno)); goto _err; } SStreamSnapWriter* pSnapWriter = NULL; - if (streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter) < 0) { + if ((code = streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter)) < 0) { goto _err; } @@ -157,14 +156,14 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pWriter->pWriterImpl = pSnapWriter; *ppWriter = pWriter; - return code; + return 0; _err: tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, tstrerror(terrno)); taosMemoryFree(pWriter); *ppWriter = NULL; - return -1; + return code; } int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 54abba8bdc..057ff56aa9 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -213,38 +213,34 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { char* state = taosMemoryCalloc(1, cap); if (state == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + return TSDB_CODE_OUT_OF_MEMORY; } nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state"); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; taosMemoryFree(state); - return terrno; + return TSDB_CODE_OUT_OF_RANGE; } if (chkpId != 0) { char* chkp = taosMemoryCalloc(1, cap); if (chkp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(state); - return terrno; + return TSDB_CODE_OUT_OF_MEMORY; } nBytes = snprintf(chkp, cap, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; taosMemoryFree(state); taosMemoryFree(chkp); - return terrno; + return TSDB_CODE_OUT_OF_RANGE; } if (taosIsDir(chkp) && isValidCheckpoint(chkp)) { cleanDir(state, ""); code = backendCopyFiles(chkp, state); if (code != 0) { - stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(terrno))); + stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(code))); } else { stInfo("start to restart stream backend at checkpoint path: %s", chkp); } @@ -254,8 +250,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { tstrerror(TAOS_SYSTEM_ERROR(errno)), state); code = taosMkDir(state); if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - code = terrno; + code = TAOS_SYSTEM_ERROR(errno); } } @@ -278,50 +273,48 @@ typedef struct { } SSChkpMetaOnS3; int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) { - int32_t code = -1; + int32_t code = 0; int32_t cap = strlen(path) + 32; TdFilePtr pFile = NULL; char* metaPath = taosMemoryCalloc(1, cap); if (metaPath == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + return TSDB_CODE_OUT_OF_MEMORY; } int32_t n = snprintf(metaPath, cap, "%s%s%s", path, TD_DIRSEP, "META"); if (n <= 0 || n >= cap) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(metaPath); - return terrno; + return TSDB_CODE_OUT_OF_MEMORY; } pFile = taosOpenFile(path, TD_FILE_READ); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _EXIT; } char buf[256] = {0}; if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _EXIT; } SSChkpMetaOnS3* p = taosMemoryCalloc(1, sizeof(SSChkpMetaOnS3)); if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } n = sscanf(buf, META_ON_S3_FORMATE, p->pCurrName, &p->currChkptId, p->pManifestName, &p->manifestChkptId, p->processName, &p->processId); if (n != 6) { - terrno = TSDB_CODE_INVALID_MSG; + code = TSDB_CODE_INVALID_MSG; taosMemoryFree(p); goto _EXIT; } if (p->currChkptId != p->manifestChkptId) { - terrno = TSDB_CODE_INVALID_MSG; + code = TSDB_CODE_INVALID_MSG; taosMemoryFree(p); goto _EXIT; } @@ -330,53 +323,52 @@ int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) { _EXIT: taosCloseFile(&pFile); taosMemoryFree(metaPath); - code = terrno; return code; } int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t chkpId) { - int32_t code = -1; + int32_t code = 0; int32_t nBytes = 0; int32_t cap = strlen(path) + 64; char* src = taosMemoryCalloc(1, cap); char* dst = taosMemoryCalloc(1, cap); if (src == NULL || dst == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) { - terrno = TSDB_CODE_INVALID_CFG; + code = TSDB_CODE_INVALID_CFG; goto _EXIT; } // rename current_chkp/mainfest to current for (int i = 0; i < 2; i++) { char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName); if (strlen(key) <= 0) { - terrno = TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_PARA; goto _EXIT; } nBytes = snprintf(src, cap, "%s%s%s_%" PRId64 "", path, TD_DIRSEP, key, pMeta->currChkptId); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _EXIT; } if (taosStatFile(src, NULL, NULL, NULL) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _EXIT; } nBytes = snprintf(dst, cap, "%s%s%s", path, TD_DIRSEP, key); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _EXIT; } if (taosRenameFile(src, dst) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _EXIT; } @@ -389,11 +381,10 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t ch _EXIT: taosMemoryFree(src); taosMemoryFree(dst); - code = terrno; return code; } int32_t remoteChkpGetDelFile(char* path, SArray* toDel) { - int32_t code = -1; + int32_t code = 0; int32_t nBytes = 0; SSChkpMetaOnS3* pMeta = NULL; @@ -408,28 +399,24 @@ int32_t remoteChkpGetDelFile(char* path, SArray* toDel) { int32_t cap = strlen(key) + 32; char* p = taosMemoryCalloc(1, cap); if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pMeta); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } nBytes = snprintf(p, cap, "%s_%" PRId64 "", key, pMeta->currChkptId); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; taosMemoryFree(pMeta); taosMemoryFree(p); - return code; + return TSDB_CODE_OUT_OF_RANGE; } if (taosArrayPush(toDel, &p) == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pMeta); taosMemoryFree(p); - return code; + return TSDB_CODE_OUT_OF_MEMORY; } } - code = 0; - return code; + return 0; } void cleanDir(const char* pPath, const char* id) { @@ -475,19 +462,18 @@ int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) { int32_t code = remoteChkp_readMetaData(chkpPath, &pMeta); if (code != 0) { - return -1; + return code; } if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) { taosMemoryFree(pMeta); - terrno = TSDB_CODE_INVALID_PARA; - return -1; + return TSDB_CODE_INVALID_PARA; } code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId); if (code != 0) { taosMemoryFree(pMeta); - return -1; + return code; } return chkpAddExtraInfo(chkpPath, chkpId, pMeta->processId); @@ -504,22 +490,20 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId char* defaultTmp = taosMemoryCalloc(1, cap); if (defaultTmp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } int32_t nBytes = snprintf(defaultPath, cap, "%s%s", defaultPath, "_tmp"); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; taosMemoryFree(defaultPath); - return -1; + return TSDB_CODE_OUT_OF_RANGE; } if (taosIsDir(defaultTmp)) taosRemoveDir(defaultTmp); if (taosIsDir(defaultPath)) { code = taosRenameFile(defaultPath, defaultTmp); if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _EXIT; } else { rename = 1; @@ -527,7 +511,7 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId } else { code = taosMkDir(defaultPath); if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _EXIT; } } @@ -593,7 +577,7 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) { const char* info = "info"; size_t infoLen = strlen(info); - int32_t code = -1; + int32_t code = 0; int32_t sLen = strlen(src); int32_t dLen = strlen(dst); int32_t cap = TMAX(sLen, dLen) + 64; @@ -602,14 +586,17 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) { char* srcName = taosMemoryCalloc(1, cap); char* dstName = taosMemoryCalloc(1, cap); if (srcName == NULL || dstName == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + taosMemoryFree(srcName); + taosMemoryFree(dstName); + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } // copy file to dst TdDirPtr pDir = taosOpenDir(src); if (pDir == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); + goto _ERROR; } errno = 0; @@ -622,36 +609,36 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) { nBytes = snprintf(srcName, cap, "%s%s%s", src, TD_DIRSEP, name); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } nBytes = snprintf(dstName, cap, "%s%s%s", dst, TD_DIRSEP, name); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) { code = copyFiles_create(srcName, dstName, 0); if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(terrno)); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code)); goto _ERROR; } } else if (strncmp(name, info, strlen(name) <= infoLen ? strlen(name) : infoLen) == 0) { code = copyFiles_create(srcName, dstName, 0); if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(terrno)); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code)); goto _ERROR; } } else { code = copyFiles_hardlink(srcName, dstName, 0); if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(terrno)); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code)); goto _ERROR; } else { stDebug("succ hard link file:%s to %s", srcName, dstName); @@ -688,8 +675,7 @@ static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* ch if (code != TSDB_CODE_SUCCESS) { cleanDir(defaultPath, pTaskIdStr); stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote", - pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(terrno))); - terrno = 0; + pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(code))); code = TSDB_CODE_SUCCESS; } else { stInfo("%s copy checkpoint data from:%s to:%s succ, try to start stream backend", pTaskIdStr, checkpointPath, @@ -705,7 +691,7 @@ static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* ch int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath, int64_t* processVer) { - int32_t code = -1; + int32_t code = 0; char* prefixPath = NULL; char* defaultPath = NULL; @@ -721,43 +707,43 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId checkpointPath = taosMemoryCalloc(1, cap); checkpointRoot = taosMemoryCalloc(1, cap); if (prefixPath == NULL || defaultPath == NULL || checkpointPath == NULL || checkpointRoot == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _EXIT; } nBytes = snprintf(prefixPath, cap, "%s%s%s", path, TD_DIRSEP, key); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _EXIT; } code = createDirIfNotExist(prefixPath); if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _EXIT; } nBytes = snprintf(defaultPath, cap, "%s%s%s", prefixPath, TD_DIRSEP, "state"); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _EXIT; } code = createDirIfNotExist(defaultPath); if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _EXIT; } nBytes = snprintf(checkpointRoot, cap, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints"); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _EXIT; } code = createDirIfNotExist(checkpointRoot); if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _EXIT; } @@ -766,19 +752,18 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId nBytes = snprintf(checkpointPath, cap, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _EXIT; } code = rebuildFromLocalCheckpoint(key, checkpointPath, chkptId, defaultPath, processVer); if (code != 0) { - terrno = 0; code = rebuildFromRemoteCheckpoint(key, checkpointPath, chkptId, defaultPath); } if (code != 0) { stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s, reason:%s", - checkpointPath, tstrerror(code), defaultPath, tstrerror(terrno)); + checkpointPath, tstrerror(code), defaultPath, tstrerror(code)); code = 0; // reset the error code } } else { // no valid checkpoint id @@ -802,21 +787,6 @@ _EXIT: return code; } -bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId) { - bool exist = true; - char* state = taosMemoryCalloc(1, strlen(path) + 32); - if (state == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return false; - } - sprintf(state, "%s%s%s", path, TD_DIRSEP, "state"); - if (!taosDirExist(state)) { - exist = false; - } - taosMemoryFree(state); - return exist; -} - void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { char* backendPath = NULL; int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); @@ -1313,12 +1283,14 @@ int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) { if (cp == NULL || err != NULL) { stError("failed to do checkpoint at:%s, reason:%s", path, err); taosMemoryFreeClear(err); + code = TSDB_CODE_THIRDPARTY_ERROR; goto _ERROR; } rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err); if (err != NULL) { stError("failed to do checkpoint at:%s, reason:%s", path, err); taosMemoryFreeClear(err); + code = TSDB_CODE_THIRDPARTY_ERROR; } else { code = 0; } @@ -1332,13 +1304,17 @@ int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32 char* err = NULL; rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + if (flushOpt == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + rocksdb_flushoptions_set_wait(flushOpt, 1); rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err); if (err != NULL) { stError("failed to flush db before streamBackend clean up, reason:%s", err); taosMemoryFree(err); - code = -1; + code = TSDB_CODE_THIRDPARTY_ERROR; } rocksdb_flushoptions_destroy(flushOpt); return code; @@ -1346,28 +1322,47 @@ int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32 int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) { int32_t code = 0; - char* pChkpDir = taosMemoryCalloc(1, 256); - char* pChkpIdDir = taosMemoryCalloc(1, 256); + int32_t cap = strlen(path) + 256; + int32_t nBytes = 0; - sprintf(pChkpDir, "%s%s%s", path, TD_DIRSEP, "checkpoints"); - code = taosMulModeMkDir(pChkpDir, 0755, true); - if (code != 0) { - stError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); - taosMemoryFree(pChkpDir); - taosMemoryFree(pChkpIdDir); - code = -1; - return code; + char* pChkpDir = taosMemoryCalloc(1, cap); + char* pChkpIdDir = taosMemoryCalloc(1, cap); + if (pChkpDir == NULL || pChkpIdDir == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } + + nBytes = snprintf(pChkpDir, cap, "%s%s%s", path, TD_DIRSEP, "checkpoints"); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } + + nBytes = snprintf(pChkpIdDir, cap, "%s%s%s%" PRId64, pChkpDir, TD_DIRSEP, "checkpoint", chkpId); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } + + code = taosMulModeMkDir(pChkpDir, 0755, true); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); + goto _EXIT; } - sprintf(pChkpIdDir, "%s%s%s%" PRId64, pChkpDir, TD_DIRSEP, "checkpoint", chkpId); if (taosIsDir(pChkpIdDir)) { stInfo("stream rm exist checkpoint%s", pChkpIdDir); taosRemoveDir(pChkpIdDir); } + *chkpDir = pChkpDir; *chkpIdDir = pChkpIdDir; - return 0; +_EXIT: + taosMemoryFree(pChkpDir); + taosMemoryFree(pChkpIdDir); + return code; } int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { @@ -1396,7 +1391,6 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { // remove chkpId from in-use-ckpkIdSet taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId); taskDbRemoveRef(pTaskDb); - code = -1; break; } @@ -1409,8 +1403,7 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { // remove chkpid from chkp-in-use set taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId); taskDbRemoveRef(pTaskDb); - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = -1; + code = TSDB_CODE_OUT_OF_MEMORY; break; } taosArrayPush(pSnap, &snap); @@ -1491,29 +1484,29 @@ int64_t taskGetDBRef(void* arg) { int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) { TdFilePtr pFile = NULL; - int32_t code = -1; + int32_t code = 0; char buf[256] = {0}; int32_t nBytes = 0; int32_t len = strlen(pChkpIdDir); if (len == 0) { - terrno = TSDB_CODE_INVALID_PARA; - stError("failed to load extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(terrno)); - return terrno; + code = TSDB_CODE_INVALID_PARA; + stError("failed to load extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code)); + return code; } int32_t cap = len + 64; char* pDst = taosMemoryCalloc(1, cap); if (pDst == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; stError("failed to alloc memory to load extra info, dir:%s", pChkpIdDir); goto _EXIT; } nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; stError("failed to build dst to load extra info, dir:%s", pChkpIdDir); goto _EXIT; } @@ -1526,31 +1519,31 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) code = 0; goto _EXIT; } else { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("failed to open file to load extra info, file:%s", pDst); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to open file to load extra info, file:%s, reason:%s", pDst, tstrerror(code)); } goto _EXIT; } if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno)); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(code)); goto _EXIT; } if (sscanf(buf, "%" PRId64 " %" PRId64 "", chkpId, processId) < 2) { - terrno = TSDB_CODE_INVALID_PARA; - stError("failed to read file content to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno)); + code = TSDB_CODE_INVALID_PARA; + stError("failed to read file content to load extra info, file:%s, reason:%s", pDst, tstrerror(code)); goto _EXIT; } code = 0; _EXIT: taosMemoryFree(pDst); taosCloseFile(&pFile); - return terrno; + return code; } int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { - int32_t code = -1; + int32_t code = 0; TdFilePtr pFile = NULL; @@ -1559,41 +1552,43 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { int32_t len = strlen(pChkpIdDir); if (len == 0) { - terrno = TSDB_CODE_INVALID_PARA; - stError("failed to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(terrno)); - return -1; + code = TSDB_CODE_INVALID_PARA; + stError("failed to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code)); + return code; } + int32_t cap = len + 64; char* pDst = taosMemoryCalloc(1, cap); if (pDst == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; stError("failed to alloc memory to add extra info, dir:%s", pChkpIdDir); goto _EXIT; } nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP); if (nBytes <= 0 || nBytes >= cap) { - stError("failed to build dst to add extra info, dir:%s", pChkpIdDir); + code = TSDB_CODE_OUT_OF_RANGE; + stError("failed to build dst to add extra info, dir:%s, reason:%d", pChkpIdDir, tstrerror(code)); goto _EXIT; } pFile = taosOpenFile(pDst, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("failed to open file to add extra info, file:%s", pDst); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to open file to add extra info, file:%s, reason:%s", pDst, tstrerror(code)); goto _EXIT; } nBytes = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId); if (nBytes <= 0 || nBytes >= sizeof(buf)) { - terrno = TSDB_CODE_OUT_OF_RANGE; - stError("failed to build content to add extra info, dir:%s", pChkpIdDir); + code = TSDB_CODE_OUT_OF_RANGE; + stError("failed to build content to add extra info, dir:%s,reason:%d", pChkpIdDir, tstrerror(code)); goto _EXIT; } if (nBytes != taosWriteFile(pFile, buf, nBytes)) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("failed to write file to add extra info, file:%s, reason:%s", pDst, tstrerror(terrno)); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to write file to add extra info, file:%s, reason:%s", pDst, tstrerror(code)); goto _EXIT; } code = 0; @@ -1606,17 +1601,18 @@ _EXIT: int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId) { STaskDbWrapper* pTaskDb = arg; int64_t st = taosGetTimestampMs(); - int32_t code = -1; + int32_t code = 0; int64_t refId = pTaskDb->refId; if (taosAcquireRef(taskDbWrapperId, refId) == NULL) { - return -1; + code = terrno; + terrno = 0; + return code; } char* pChkpDir = NULL; char* pChkpIdDir = NULL; - if (chkpPreBuildDir(pTaskDb->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) { - code = -1; + if ((code = chkpPreBuildDir(pTaskDb->path, chkpId, &pChkpDir, &pChkpIdDir)) < 0) { goto _EXIT; } // Get all cf and acquire cfWrappter @@ -2404,16 +2400,15 @@ int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** sta int32_t code = 0; char* statePath = taosMemoryCalloc(1, strlen(path) + 128); if (statePath == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } sprintf(statePath, "%s%s%s", path, TD_DIRSEP, key); if (!taosDirExist(statePath)) { code = taosMulMkDir(statePath); if (code != 0) { - terrno = errno; - stError("failed to create dir: %s, reason:%s", statePath, tstrerror(terrno)); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to create dir: %s, reason:%s", statePath, tstrerror(code)); taosMemoryFree(statePath); return code; } @@ -2422,15 +2417,14 @@ int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** sta char* dbPath = taosMemoryCalloc(1, strlen(statePath) + 128); if (dbPath == NULL) { taosMemoryFree(statePath); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } sprintf(dbPath, "%s%s%s", statePath, TD_DIRSEP, "state"); if (!taosDirExist(dbPath)) { code = taosMulMkDir(dbPath); if (code != 0) { - terrno = errno; + code = TAOS_SYSTEM_ERROR(errno); stError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code)); taosMemoryFree(statePath); taosMemoryFree(dbPath); @@ -2511,8 +2505,9 @@ _EXIT: STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer) { char* statePath = NULL; char* dbPath = NULL; - - if (restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer) != 0) { + int code = 0; + if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) { + terrno = code; stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId, tstrerror(terrno)); return NULL; @@ -2521,17 +2516,14 @@ STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, i STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath); if (pTaskDb != NULL) { int64_t chkpId = -1, ver = -1; - if (chkpLoadExtraInfo(dbPath, &chkpId, &ver) == 0) { + if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver) == 0)) { *processVer = ver; } else { - if (terrno == TSDB_CODE_OUT_OF_MEMORY) { - taskDbDestroy(pTaskDb, false); - return NULL; - } else { - // not info file exists, caller handle this situation - terrno = 0; - *processVer = -1; - } + terrno = code; + stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId, + tstrerror(terrno)); + taskDbDestroy(pTaskDb, false); + return NULL; } } @@ -2623,27 +2615,27 @@ void taskDbDestroy(void* pDb, bool flush) { void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); } int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) { - int32_t code = -1; + int32_t code = 0; int64_t refId = pDb->refId; int32_t nBytes = 0; if (taosAcquireRef(taskDbWrapperId, refId) == NULL) { - return -1; + code = terrno; + return code; } int32_t cap = strlen(pDb->path) + 128; char* buf = taosMemoryCalloc(1, cap); if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } nBytes = snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; - return -1; + taosMemoryFree(buf); + return TSDB_CODE_OUT_OF_RANGE; } if (taosIsDir(buf)) { @@ -2660,20 +2652,28 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list, const char* idStr) { int32_t code = 0; + int32_t cap = strlen(pDb->path) + 32; SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; - char* temp = taosMemoryCalloc(1, strlen(pDb->path) + 32); + char* temp = taosMemoryCalloc(1, cap); if (temp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } - sprintf(temp, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId); + int32_t nBytes = snprintf(temp, cap, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId); + if (nBytes <= 0 || nBytes >= cap) { + taosMemoryFree(temp); + return TSDB_CODE_OUT_OF_RANGE; + } if (taosDirExist(temp)) { cleanDir(temp, idStr); } else { - taosMkDir(temp); + code = taosMkDir(temp); + if (code != 0) { + taosMemoryFree(temp); + return TAOS_SYSTEM_ERROR(errno); + } } code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp); @@ -4405,8 +4405,7 @@ int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { if (!isBkdDataMeta(name, len) && !taosHashGet(p1, name, len)) { char* fname = taosMemoryCalloc(1, len + 1); if (fname == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + return TSDB_CODE_OUT_OF_MEMORY; } strncpy(fname, name, len); taosArrayPush(diff, &fname); @@ -4483,6 +4482,7 @@ void dbChkpDebugInfo(SDbChkp* pDb) { } } int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { + int32_t code = 0; int32_t nBytes; taosThreadRwlockWrlock(&p->rwLock); @@ -4502,9 +4502,8 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { nBytes = snprintf(p->buf, p->len, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); if (nBytes <= 0 || nBytes >= p->len) { - terrno = TSDB_CODE_OUT_OF_RANGE; taosThreadRwlockUnlock(&p->rwLock); - return terrno; + return TSDB_CODE_OUT_OF_RANGE; } taosArrayClearP(p->pAdd, taosMemoryFree); @@ -4513,9 +4512,8 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { TdDirPtr pDir = taosOpenDir(p->buf); if (pDir == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); taosThreadRwlockUnlock(&p->rwLock); - return terrno; + return TAOS_SYSTEM_ERROR(errno); } TdDirEntryPtr de = NULL; @@ -4528,7 +4526,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { p->pCurrent = taosStrdup(name); if (p->pCurrent == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } continue; @@ -4538,7 +4536,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { taosMemoryFreeClear(p->pManifest); p->pManifest = taosStrdup(name); if (p->pManifest == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } continue; @@ -4551,9 +4549,9 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { } } taosCloseDir(&pDir); - if (terrno != 0) { + if (code != 0) { taosThreadRwlockUnlock(&p->rwLock); - return terrno; + return code; } if (p->init == 0) { @@ -4564,9 +4562,8 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { if (name != NULL && !isBkdDataMeta(name, len)) { char* fname = taosMemoryCalloc(1, len + 1); if (fname == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosThreadRwlockUnlock(&p->rwLock); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } strncpy(fname, name, len); @@ -4587,7 +4584,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { taosArrayClearP(p->pDel, taosMemoryFree); taosHashClear(p->pSstTbl[1 - p->idx]); p->update = 0; - return terrno; + return code; } if (taosArrayGetSize(p->pAdd) == 0 && taosArrayGetSize(p->pDel) == 0) { @@ -4604,7 +4601,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { taosThreadRwlockUnlock(&p->rwLock); - return 0; + return code; } void dbChkpDestroy(SDbChkp* pChkp); @@ -4698,7 +4695,7 @@ int32_t dbChkpInit(SDbChkp* p) { #endif int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { static char* chkpMeta = "META"; - int32_t code = -1; + int32_t code = 0; int32_t cap = p->len + 128; taosThreadRwlockRdlock(&p->rwLock); @@ -4708,30 +4705,33 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { char* srcDir = taosMemoryCalloc(1, cap); char* dstDir = taosMemoryCalloc(1, cap); if (srcBuf == NULL || dstBuf == NULL || srcDir == NULL || dstDir == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _ERROR; } int nBytes = snprintf(srcDir, cap, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", p->curChkpId); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } nBytes = snprintf(dstDir, cap, "%s", dname); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } if (!taosDirExist(srcDir)) { stError("failed to dump srcDir %s, reason: not exist such dir", srcDir); + code = TSDB_CODE_INVALID_PARA; goto _ERROR; } int64_t chkpId = 0, processId = -1; - if (chkpLoadExtraInfo(srcDir, &chkpId, &processId) != 0) { - stError("failed to load extra info from %s, reason:%s", srcDir, terrno != 0 ? "unkown" : tstrerror(terrno)); + code = chkpLoadExtraInfo(srcDir, &chkpId, &processId); + if (code < 0) { + stError("failed to load extra info from %s, reason:%s", srcDir, code != 0 ? "unkown" : tstrerror(code)); + goto _ERROR; } @@ -4743,19 +4743,19 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { char* filename = taosArrayGetP(p->pAdd, i); nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, filename); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, filename); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } if (taosCopyFile(srcBuf, dstBuf) < 0) { - terrno = errno; - stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno)); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code)); goto _ERROR; } } @@ -4764,7 +4764,7 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { char* filename = taosArrayGetP(p->pDel, i); char* p = taosStrdup(filename); if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _ERROR; } taosArrayPush(list, &p); @@ -4776,19 +4776,19 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } if (taosCopyFile(srcBuf, dstBuf) < 0) { - terrno = errno; - stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno)); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code)); goto _ERROR; } @@ -4798,33 +4798,33 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } if (taosCopyFile(srcBuf, dstBuf) < 0) { - terrno = errno; - stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno)); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code)); goto _ERROR; } memset(dstBuf, 0, cap); nBytes = snprintf(dstDir, cap, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } TdFilePtr pFile = taosOpenFile(dstDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(terrno)); + code = TAOS_SYSTEM_ERROR(errno); + stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(code)); goto _ERROR; } @@ -4832,7 +4832,7 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { nBytes = snprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest, p->curChkpId, "processVer", processId); if (nBytes <= 0 || nBytes >= sizeof(content)) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir); taosCloseFile(&pFile); goto _ERROR; @@ -4840,8 +4840,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { nBytes = taosWriteFile(pFile, content, strlen(content)); if (nBytes != strlen(content)) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(terrno)); + code = TAOS_SYSTEM_ERROR(errno); + stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(code)); taosCloseFile(&pFile); goto _ERROR; } @@ -4859,10 +4859,11 @@ _ERROR: taosMemoryFree(srcDir); taosMemoryFree(dstDir); - return terrno; + return code; } SBkdMgt* bkdMgtCreate(char* path) { + terrno = 0; SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt)); if (p == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -4910,7 +4911,6 @@ void bkdMgtDestroy(SBkdMgt* bm) { } int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* dname) { int32_t code = 0; - taosThreadRwlockWrlock(&bm->rwLock); SDbChkp** ppChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); SDbChkp* pChkp = ppChkp != NULL ? *ppChkp : NULL; @@ -4919,30 +4919,31 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, int32_t cap = strlen(bm->path) + 64; char* path = taosMemoryCalloc(1, cap); if (path == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosThreadRwlockUnlock(&bm->rwLock); - return terrno; + return TSDB_CODE_OUT_OF_MEMORY; } int32_t nBytes = snprintf(path, cap, "%s%s%s", bm->path, TD_DIRSEP, taskId); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; taosMemoryFree(path); taosThreadRwlockUnlock(&bm->rwLock); - return terrno; + code = TSDB_CODE_OUT_OF_RANGE; + return code; } SDbChkp* p = dbChkpCreate(path, chkpId); if (p == NULL) { taosMemoryFree(path); taosThreadRwlockUnlock(&bm->rwLock); - return terrno; + code = terrno; + return code; } if (taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)) != 0) { dbChkpDestroy(p); taosThreadRwlockUnlock(&bm->rwLock); - return terrno; + code = terrno; + return code; } pChkp = p; @@ -4950,14 +4951,14 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, taosThreadRwlockUnlock(&bm->rwLock); return code; } else { - terrno = dbChkpGetDelta(pChkp, chkpId, NULL); + code = dbChkpGetDelta(pChkp, chkpId, NULL); if (code == 0) { - terrno = dbChkpDumpTo(pChkp, dname, list); + code = dbChkpDumpTo(pChkp, dname, list); } } taosThreadRwlockUnlock(&bm->rwLock); - return terrno; + return code; } #ifdef BUILD_NO_CALL diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1e77e70efa..a66c7a7cfa 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -548,15 +548,13 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l char* filePath = taosMemoryCalloc(1, cap); if (filePath == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP"); if (nBytes <= 0 || nBytes >= cap) { taosMemoryFree(filePath); - terrno = TSDB_CODE_OUT_OF_RANGE; - return -1; + return TSDB_CODE_OUT_OF_RANGE; } code = downloadCheckpointDataByName(id, "META", filePath); @@ -584,19 +582,18 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); if (toDelFiles == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles, pTask->id.idStr)) != 0) { - stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(terrno)); + stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(code)); } if (type == DATA_UPLOAD_S3) { if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) { stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId, - tstrerror(terrno)); + tstrerror(code)); } } @@ -1003,11 +1000,13 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) { int32_t nBytes = 0; if (s3Init() != 0) { - return -1; + return TSDB_CODE_THIRDPARTY_ERROR; } TdDirPtr pDir = taosOpenDir(path); - if (pDir == NULL) return -1; + if (pDir == NULL) { + return TAOS_SYSTEM_ERROR(errno); + } TdDirEntryPtr de = NULL; while ((de = taosReadDir(pDir)) != NULL) { @@ -1018,13 +1017,13 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) { if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) { nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name); if (nBytes <= 0 || nBytes >= sizeof(filename)) { - code = -1; + code = TSDB_CODE_OUT_OF_RANGE; break; } } else { nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name); if (nBytes <= 0 || nBytes >= sizeof(filename)) { - code = -1; + code = TSDB_CODE_OUT_OF_RANGE; break; } } @@ -1032,14 +1031,13 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) { char object[PATH_MAX] = {0}; nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); if (nBytes <= 0 || nBytes >= sizeof(object)) { - code = -1; + code = TSDB_CODE_OUT_OF_RANGE; break; } - if (s3PutObjectFromFile2(filename, object, 0) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - code = -1; - stError("[s3] failed to upload checkpoint:%s", filename); + code = s3PutObjectFromFile2(filename, object, 0); + if (code != 0) { + stError("[s3] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code)); } else { stDebug("[s3] upload checkpoint:%s", filename); } @@ -1054,21 +1052,18 @@ int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char char* buf = taosMemoryCalloc(1, cap); if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } nBytes = snprintf(buf, cap, "%s/%s", id, fname); if (nBytes <= 0 || nBytes >= cap) { taosMemoryFree(buf); - terrno = TSDB_CODE_OUT_OF_RANGE; - return -1; + return TSDB_CODE_OUT_OF_RANGE; } - - if (s3GetObjectToFile(buf, dstName) != 0) { + int32_t code = s3GetObjectToFile(buf, dstName); + if (code != 0) { taosMemoryFree(buf); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + return TAOS_SYSTEM_ERROR(errno); } taosMemoryFree(buf); return 0; @@ -1102,9 +1097,8 @@ int32_t streamTaskUploadCheckpoint(const char* id, const char* path) { // fileName: CURRENT int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { - terrno = TSDB_CODE_INVALID_PARA; stError("down load checkpoint data parameters invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } if (strlen(tsSnodeAddress) != 0) { @@ -1133,9 +1127,8 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path) { int32_t deleteCheckpoint(const char* id) { if (id == NULL || strlen(id) == 0) { - terrno = TSDB_CODE_INVALID_PARA; stError("deleteCheckpoint parameters invalid"); - return terrno; + return TSDB_CODE_INVALID_PARA; } if (strlen(tsSnodeAddress) != 0) { return deleteRsync(id); @@ -1156,8 +1149,9 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { char* tmp = object; int32_t code = s3DeleteObjects((const char**)&tmp, 1); if (code != 0) { - return code; + return TSDB_CODE_THIRDPARTY_ERROR; } + return code; } int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { @@ -1180,14 +1174,14 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code); if (code < 0) { stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code)); - return -1; + return TSDB_CODE_INVALID_MSG; } void* buf = rpcMallocCont(tlen); if (buf == NULL) { stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } SEncoder encoder; diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 57723132d8..878cb2ac71 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -228,12 +228,12 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) { return 0; } int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { - terrno = 0; + int32_t code = 0; TdDirPtr pDir = taosOpenDir(pSnapFile->path); if (NULL == pDir) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path); - return terrno; + code = TAOS_SYSTEM_ERROR(errno); + stError("%s failed to open %s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, tstrerror(code)); + return code; } TdDirEntryPtr pDirEntry; @@ -242,7 +242,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { if (strlen(name) >= strlen(ROCKSDB_CURRENT) && 0 == strncmp(name, ROCKSDB_CURRENT, strlen(ROCKSDB_CURRENT))) { pSnapFile->pCurrent = taosStrdup(name); if (pSnapFile->pCurrent == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } continue; @@ -250,7 +250,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { if (strlen(name) >= strlen(ROCKSDB_MAINFEST) && 0 == strncmp(name, ROCKSDB_MAINFEST, strlen(ROCKSDB_MAINFEST))) { pSnapFile->pMainfest = taosStrdup(name); if (pSnapFile->pMainfest == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } continue; @@ -258,7 +258,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) { pSnapFile->pOptions = taosStrdup(name); if (pSnapFile->pOptions == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } continue; @@ -267,7 +267,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { 0 == strncmp(name, ROCKSDB_CHECKPOINT_META, strlen(ROCKSDB_CHECKPOINT_META))) { pSnapFile->pCheckpointMeta = taosStrdup(name); if (pSnapFile->pCheckpointMeta == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } continue; @@ -276,7 +276,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { 0 == strncmp(name, ROCKSDB_CHECKPOINT_SELF_CHECK, strlen(ROCKSDB_CHECKPOINT_SELF_CHECK))) { pSnapFile->pCheckpointSelfcheck = taosStrdup(name); if (pSnapFile->pCheckpointSelfcheck == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } continue; @@ -285,17 +285,17 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { 0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) { char* sst = taosStrdup(name); if (sst == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } taosArrayPush(pSnapFile->pSst, &sst); } } taosCloseDir(&pDir); - return terrno; + return code; } int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) { - terrno = 0; + int32_t code = 0; int32_t nBytes = 0; int32_t cap = strlen(pSnap->dbPrefixPath) + 256; @@ -307,28 +307,28 @@ int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBacke nBytes = snprintf(path, cap, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", pSnap->chkpId); if (nBytes <= 0 || nBytes >= cap) { - terrno = TSDB_CODE_OUT_OF_RANGE; + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } if (!taosIsDir(path)) { - terrno = TSDB_CODE_INVALID_MSG; + code = TSDB_CODE_INVALID_MSG; goto _ERROR; } pSnapFile->pSst = taosArrayInit(16, sizeof(void*)); pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); if (pSnapFile->pSst == NULL || pSnapFile->pFileList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _ERROR; } pSnapFile->path = path; pSnapFile->snapInfo = *pSnap; - if ((terrno = snapFileReadMeta(pSnapFile)) != 0) { + if ((code = snapFileReadMeta(pSnapFile)) != 0) { goto _ERROR; } - if ((terrno = snapFileGenMeta(pSnapFile)) != 0) { + if ((code = snapFileGenMeta(pSnapFile)) != 0) { goto _ERROR; } @@ -337,7 +337,7 @@ int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBacke _ERROR: taosMemoryFree(path); - return terrno; + return code; } void snapFileDestroy(SBackendSnapFile2* pSnap) { taosMemoryFree(pSnap->pCheckpointMeta); @@ -365,34 +365,32 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { } int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) { // impl later - - terrno = 0; + int32_t code = 0; SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); if (pSnapInfoSet == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + return TSDB_CODE_OUT_OF_MEMORY; } - terrno = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); - if (terrno != 0) { - stError("failed to do task db snap info, reason:%s", tstrerror(terrno)); + code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); + if (code != 0) { + stError("failed to do task db snap info, reason:%s", tstrerror(code)); taosArrayDestroy(pSnapInfoSet); - return terrno; + return code; } SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); if (pDbSnapSet == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosArrayDestroy(pSnapInfoSet); - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) { SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i); SBackendSnapFile2 snapFile = {0}; - terrno = streamBackendSnapInitFile(path, pSnap, &snapFile); - ASSERT(terrno == 0); + code = streamBackendSnapInitFile(path, pSnap, &snapFile); + ASSERT(code == 0); taosArrayPush(pDbSnapSet, &snapFile); } pHandle->pDbSnapSet = pDbSnapSet; @@ -403,7 +401,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta _err: streamSnapHandleDestroy(pHandle); - return terrno; + return code; } void streamSnapHandleDestroy(SStreamSnapHandle* handle) { @@ -431,8 +429,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa // impl later SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader)); if (pReader == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } int32_t code = streamSnapHandleInit(&pReader->handle, (char*)path, pMeta); @@ -498,10 +495,10 @@ _NEXT: int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset); if (nread == -1) { taosMemoryFree(buf); - code = TAOS_SYSTEM_ERROR(terrno); + code = TAOS_SYSTEM_ERROR(errno); stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name, item->type, tstrerror(code)); - return -1; + return code; } else if (nread > 0 && nread <= kBlockSize) { // left bytes less than kBlockSize stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, @@ -558,6 +555,7 @@ _NEXT: // SMetaSnapWriter ======================================== int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapWriter** ppWriter) { // impl later + int32_t code = 0; SStreamSnapWriter* pWriter = taosMemoryCalloc(1, sizeof(SStreamSnapWriter)); if (pWriter == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -568,23 +566,23 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path pHandle->metaPath = taosStrdup(path); if (pHandle->metaPath == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pWriter); - return terrno; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); if (pHandle->pDbSnapSet == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; streamSnapWriterClose(pWriter, 0); - return terrno; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } SBackendSnapFile2 snapFile = {0}; if (taosArrayPush(pHandle->pDbSnapSet, &snapFile) == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; streamSnapWriterClose(pWriter, 0); - return terrno; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } *ppWriter = pWriter; @@ -607,7 +605,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t if (pSnapFile->fd == 0) { pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pSnapFile->fd == NULL) { - code = TAOS_SYSTEM_ERROR(terrno); + code = TAOS_SYSTEM_ERROR(errno); stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pHandle->metaPath, TD_DIRSEP, pHdr->name, tstrerror(code)); } @@ -615,7 +613,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { int64_t bytes = taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset); if (bytes != pHdr->size) { - code = TAOS_SYSTEM_ERROR(terrno); + code = TAOS_SYSTEM_ERROR(errno); stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); return code; } else { @@ -636,12 +634,16 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pSnapFile->fd == NULL) { - code = TAOS_SYSTEM_ERROR(terrno); + code = TAOS_SYSTEM_ERROR(errno); stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, TD_DIRSEP, pHdr->name, tstrerror(code)); } - taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset); + if (taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset) != pHdr->size) { + code = TAOS_SYSTEM_ERROR(errno); + stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); + return code; + } stInfo("succ to write data %s", pItem->name); pSnapFile->offset += pHdr->size; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c5bba6fa53..4563e21c6e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -96,6 +96,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout") TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space") +TAOS_DEFINE_ERROR(TSDB_CODE_THIRDPARTY_ERROR, "third party error, please check the log") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")