diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 1ca419ec4e..4a7dbead23 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -138,6 +138,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) #define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D) #define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E) +#define TSDB_CODE_THIRDPARTY_ERROR TAOS_DEF_ERROR_CODE(0, 0x012F) #define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) #define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 290266d94a..c79fc66a06 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -51,10 +51,9 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS SStreamSnapReader* pSnapReader = NULL; - if (streamSnapReaderOpen(meta, sver, chkpId, meta->path, &pSnapReader) == 0) { + if ((code = streamSnapReaderOpen(meta, sver, chkpId, meta->path, &pSnapReader)) == 0) { pReader->complete = 1; } else { - code = -1; taosMemoryFree(pReader); goto _err; } @@ -75,7 +74,7 @@ _err: int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) { int32_t code = 0; tqDebug("vgId:%d, vnode %s snapshot reader closed", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER); - streamSnapReaderClose(pReader->pReaderImpl); + code = streamSnapReaderClose(pReader->pReaderImpl); taosMemoryFree(pReader); return code; } @@ -138,32 +137,36 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pWriter->sver = sver; pWriter->ever = ever; - taosMkDir(pTq->pStreamMeta->path); - - SStreamSnapWriter* pSnapWriter = NULL; - if (streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter) < 0) { + if (taosMkDir(pTq->pStreamMeta->path) != 0) { + code = TAOS_SYSTEM_ERROR(errno); + tqError("vgId:%d, vnode %s snapshot writer failed to create directory %s since %s", TD_VID(pTq->pVnode), + STREAM_STATE_TRANSFER, pTq->pStreamMeta->path, tstrerror(code)); goto _err; } - tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, pTq->pStreamMeta->path); + SStreamSnapWriter* pSnapWriter = NULL; + if ((code = streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter)) < 0) { + goto _err; + } + + tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, + pTq->pStreamMeta->path); pWriter->pWriterImpl = pSnapWriter; *ppWriter = pWriter; - return code; + return 0; + _err: tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, tstrerror(code)); taosMemoryFree(pWriter); *ppWriter = NULL; - return -1; + return code; } int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) { - int32_t code = 0; tqDebug("vgId:%d, vnode %s snapshot writer closed", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); - code = streamSnapWriterClose(pWriter->pWriterImpl, rollback); - - return code; + return streamSnapWriterClose(pWriter->pWriterImpl, rollback); } int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 6b81ac87ee..f0647f44a3 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -131,20 +131,21 @@ typedef struct { TdThreadRwlock rwLock; } SBkdMgt; -bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId); +#define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 "" + +bool streamBackendDataIsExist(const char* path, int64_t chkpId); void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); int32_t streamBackendLoadCheckpointInfo(void* pMeta); -int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId); +int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId, int64_t processver); SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); -STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId); +STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer); void taskDbDestroy(void* pBackend, bool flush); void taskDbDestroy2(void* pBackend); -int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); @@ -249,7 +250,7 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); int32_t taskDbBuildSnap(void* arg, SArray* pSnap); int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo); -int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); +int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId); SBkdMgt* bkdMgtCreate(char* path); int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); @@ -259,6 +260,7 @@ void bkdMgtDestroy(SBkdMgt* bm); int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* id); +int32_t remoteChkpGetDelFile(char* path, SArray* toDel); void* taskAcquireDb(int64_t refId); void taskReleaseDb(int64_t refId); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f5c0ac57df..8b87019ee0 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -150,6 +150,9 @@ static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const cha void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp); void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp); +int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId); +int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId); + #define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); int32_t copyFiles(const char* src, const char* dst); uint32_t nextPow2(uint32_t x); @@ -194,28 +197,50 @@ int32_t getCfIdx(const char* cfName) { return idx; } -bool isValidCheckpoint(const char* dir) { return true; } +bool isValidCheckpoint(const char* dir) { + // not implement yet + return true; +} +/* + *copy pChkpIdDir's file to state dir + */ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { // impl later int32_t code = 0; + int32_t cap = strlen(path) + 64; + int32_t nBytes = 0; + + char* state = taosMemoryCalloc(1, cap); + if (state == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state"); + if (nBytes <= 0 || nBytes >= cap) { + taosMemoryFree(state); + return TSDB_CODE_OUT_OF_RANGE; + } - /*param@1: checkpointId dir - param@2: state - copy pChkpIdDir's file to state dir - opt to set hard link to previous file - */ - char* state = taosMemoryCalloc(1, strlen(path) + 32); - sprintf(state, "%s%s%s", path, TD_DIRSEP, "state"); if (chkpId != 0) { - char* chkp = taosMemoryCalloc(1, strlen(path) + 64); - sprintf(chkp, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); + char* chkp = taosMemoryCalloc(1, cap); + if (chkp == NULL) { + taosMemoryFree(state); + return TSDB_CODE_OUT_OF_MEMORY; + } + + nBytes = snprintf(chkp, cap, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); + if (nBytes <= 0 || nBytes >= cap) { + taosMemoryFree(state); + taosMemoryFree(chkp); + return TSDB_CODE_OUT_OF_RANGE; + } + if (taosIsDir(chkp) && isValidCheckpoint(chkp)) { cleanDir(state, ""); code = backendCopyFiles(chkp, state); - stInfo("copy snap file from %s to %s", chkp, state); if (code != 0) { - stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); + stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(code))); } else { stInfo("start to restart stream backend at checkpoint path: %s", chkp); } @@ -223,101 +248,175 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { } else { stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)), state); - taosMkDir(state); + code = taosMkDir(state); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + } } taosMemoryFree(chkp); } *dst = state; - return 0; + return code; } -int32_t remoteChkp_readMetaData(char* path, SArray* list) { - char* metaPath = taosMemoryCalloc(1, strlen(path)); - sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META"); +typedef struct { + char pCurrName[24]; + int64_t currChkptId; - TdFilePtr pFile = taosOpenFile(path, TD_FILE_READ); - if (pFile == NULL) { - return -1; + char pManifestName[24]; + int64_t manifestChkptId; + + char processName[24]; + int64_t processId; +} SSChkpMetaOnS3; + +int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) { + int32_t code = 0; + int32_t cap = strlen(path) + 32; + TdFilePtr pFile = NULL; + + char* metaPath = taosMemoryCalloc(1, cap); + if (metaPath == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; } - char buf[128] = {0}; - if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { + int32_t n = snprintf(metaPath, cap, "%s%s%s", path, TD_DIRSEP, "META"); + if (n <= 0 || n >= cap) { taosMemoryFree(metaPath); - taosCloseFile(&pFile); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } - int32_t len = strnlen(buf, tListLen(buf)); - for (int i = 0; i < len; i++) { - if (buf[i] == '\n') { - char* item = taosMemoryCalloc(1, i + 1); - memcpy(item, buf, i); - taosArrayPush(list, &item); - - item = taosMemoryCalloc(1, len - i); - memcpy(item, buf + i + 1, len - i - 1); - taosArrayPush(list, &item); - } + pFile = taosOpenFile(path, TD_FILE_READ); + if (pFile == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; } + char buf[256] = {0}; + if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; + } + + SSChkpMetaOnS3* p = taosMemoryCalloc(1, sizeof(SSChkpMetaOnS3)); + if (p == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } + n = sscanf(buf, META_ON_S3_FORMATE, p->pCurrName, &p->currChkptId, p->pManifestName, &p->manifestChkptId, + p->processName, &p->processId); + if (n != 6) { + code = TSDB_CODE_INVALID_MSG; + taosMemoryFree(p); + goto _EXIT; + } + + if (p->currChkptId != p->manifestChkptId) { + code = TSDB_CODE_INVALID_MSG; + taosMemoryFree(p); + goto _EXIT; + } + *pMeta = p; + code = 0; +_EXIT: taosCloseFile(&pFile); taosMemoryFree(metaPath); - return 0; + return code; } -int32_t remoteChkp_validMetaFile(char* name, char* prename, int64_t chkpId) { - int8_t valid = 0; - for (int i = 0; i < strlen(name); i++) { - if (name[i] == '_') { - memcpy(prename, name, i); - if (taosStr2int64(name + i + 1) != chkpId) { - break; - } else { - valid = 1; - } - } + +int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t chkpId) { + int32_t code = 0; + int32_t nBytes = 0; + + int32_t cap = strlen(path) + 64; + char* src = taosMemoryCalloc(1, cap); + char* dst = taosMemoryCalloc(1, cap); + if (src == NULL || dst == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; } - return valid; -} -int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { - int32_t complete = 1; - int32_t len = strlen(path) + 32; - char* src = taosMemoryCalloc(1, len); - char* dst = taosMemoryCalloc(1, len); - int8_t count = 0; - for (int i = 0; i < taosArrayGetSize(list); i++) { - char* p = taosArrayGetP(list, i); - sprintf(src, "%s%s%s", path, TD_DIRSEP, p); + if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) { + code = TSDB_CODE_INVALID_CFG; + goto _EXIT; + } + // rename current_chkp/mainfest to current + for (int i = 0; i < 2; i++) { + char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName); + if (strlen(key) <= 0) { + code = TSDB_CODE_INVALID_PARA; + goto _EXIT; + } + + nBytes = snprintf(src, cap, "%s%s%s_%" PRId64 "", path, TD_DIRSEP, key, pMeta->currChkptId); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } - // check file exist if (taosStatFile(src, NULL, NULL, NULL) != 0) { - complete = 0; - break; + code = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; } - // check file name - char temp[64] = {0}; - if (remoteChkp_validMetaFile(p, temp, chkpId)) { - count++; + nBytes = snprintf(dst, cap, "%s%s%s", path, TD_DIRSEP, key); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; } - // rename file - sprintf(dst, "%s%s%s", path, TD_DIRSEP, temp); - taosRenameFile(src, dst); + if (taosRenameFile(src, dst) != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; + } - memset(src, 0, len); - memset(dst, 0, len); - } - if (count != taosArrayGetSize(list)) { - complete = 0; + memset(src, 0, cap); + memset(dst, 0, cap); } + code = 0; +// rename manifest_chkp to manifest +_EXIT: taosMemoryFree(src); taosMemoryFree(dst); + return code; +} +int32_t remoteChkpGetDelFile(char* path, SArray* toDel) { + int32_t code = 0; + int32_t nBytes = 0; - return complete == 1 ? 0 : -1; + SSChkpMetaOnS3* pMeta = NULL; + code = remoteChkp_readMetaData(path, &pMeta); + if (code != 0) { + return code; + } + + for (int i = 0; i < 2; i++) { + char* key = (i == 0 ? pMeta->pCurrName : pMeta->pManifestName); + + int32_t cap = strlen(key) + 32; + char* p = taosMemoryCalloc(1, cap); + if (p == NULL) { + taosMemoryFree(pMeta); + return TSDB_CODE_OUT_OF_MEMORY; + } + + nBytes = snprintf(p, cap, "%s_%" PRId64 "", key, pMeta->currChkptId); + if (nBytes <= 0 || nBytes >= cap) { + taosMemoryFree(pMeta); + taosMemoryFree(p); + return TSDB_CODE_OUT_OF_RANGE; + } + if (taosArrayPush(toDel, &p) == NULL) { + taosMemoryFree(pMeta); + taosMemoryFree(p); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + return 0; } void cleanDir(const char* pPath, const char* id) { @@ -330,73 +429,126 @@ void cleanDir(const char* pPath, const char* id) { } } -void validateDir(const char* pPath) { +int32_t createDirIfNotExist(const char* pPath) { if (!taosIsDir(pPath)) { - taosMulMkDir(pPath); + return taosMulMkDir(pPath); + } else { + return 0; } } -int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) { +int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) { int32_t code = 0; - if (taosIsDir(chkptPath)) { - taosRemoveDir(chkptPath); - stDebug("remove local checkpoint data dir:%s succ", chkptPath); + if (taosIsDir(checkpointPath)) { + taosRemoveDir(checkpointPath); + stDebug("remove local checkpoint data dir:%s succ", checkpointPath); } cleanDir(defaultPath, key); stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath); - code = streamTaskDownloadCheckpointData(key, chkptPath); + code = streamTaskDownloadCheckpointData(key, checkpointPath); if (code != 0) { stError("failed to download checkpoint data:%s", key); return code; } stDebug("download remote checkpoint data for checkpointId:%" PRId64 ", %s", checkpointId, key); - return backendCopyFiles(chkptPath, defaultPath); + return backendCopyFiles(checkpointPath, defaultPath); +} + +int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) { + SSChkpMetaOnS3* pMeta = NULL; + + int32_t code = remoteChkp_readMetaData(chkpPath, &pMeta); + if (code != 0) { + return code; + } + + if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) { + taosMemoryFree(pMeta); + return TSDB_CODE_INVALID_PARA; + } + + code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId); + if (code != 0) { + taosMemoryFree(pMeta); + return code; + } + taosMemoryFree(pMeta); + + return chkpAddExtraInfo(chkpPath, chkpId, pMeta->processId); } int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { + int8_t rename = 0; int32_t code = streamTaskDownloadCheckpointData(key, chkpPath); if (code != 0) { return code; } - int32_t len = strlen(defaultPath) + 32; - char* tmp = taosMemoryCalloc(1, len); - sprintf(tmp, "%s%s", defaultPath, "_tmp"); - if (taosIsDir(tmp)) taosRemoveDir(tmp); - if (taosIsDir(defaultPath)) taosRenameFile(defaultPath, tmp); + int32_t cap = strlen(defaultPath) + 32; - SArray* list = taosArrayInit(2, sizeof(void*)); - code = remoteChkp_readMetaData(chkpPath, list); - if (code == 0) { - code = remoteChkp_validAndCvtMeta(chkpPath, list, chkpId); - } - taosArrayDestroyP(list, taosMemoryFree); - - if (code == 0) { - taosMkDir(defaultPath); - code = backendCopyFiles(chkpPath, defaultPath); + char* defaultTmp = taosMemoryCalloc(1, cap); + if (defaultTmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; } - if (code != 0) { - if (taosIsDir(defaultPath)) taosRemoveDir(defaultPath); - if (taosIsDir(tmp)) taosRenameFile(tmp, defaultPath); + int32_t nBytes = snprintf(defaultPath, cap, "%s%s", defaultPath, "_tmp"); + if (nBytes <= 0 || nBytes >= cap) { + taosMemoryFree(defaultPath); + return TSDB_CODE_OUT_OF_RANGE; + } + + if (taosIsDir(defaultTmp)) taosRemoveDir(defaultTmp); + if (taosIsDir(defaultPath)) { + code = taosRenameFile(defaultPath, defaultTmp); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; + } else { + rename = 1; + } } else { - taosRemoveDir(tmp); + code = taosMkDir(defaultPath); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; + } } - taosMemoryFree(tmp); + code = rebuildDataFromS3(chkpPath, chkpId); + if (code != 0) { + goto _EXIT; + } + + code = backendCopyFiles(chkpPath, defaultPath); + if (code != 0) { + goto _EXIT; + } + code = 0; + +_EXIT: + if (code != 0) { + if (rename) { + taosRenameFile(defaultTmp, defaultPath); + } + } + + if (taosIsDir(defaultPath)) { + taosRemoveDir(defaultPath); + } + + taosMemoryFree(defaultTmp); return code; } -int32_t rebuildFromRemoteCheckpoint(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) { +int32_t rebuildFromRemoteCheckpoint(const char* key, char* checkpointPath, int64_t checkpointId, char* defaultPath) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_S3) { - return rebuildFromRemoteChkp_s3(key, chkptPath, checkpointId, defaultPath); + return rebuildFromRemoteChkp_s3(key, checkpointPath, checkpointId, defaultPath); } else if (type == DATA_UPLOAD_RSYNC) { - return rebuildFromRemoteChkp_rsync(key, chkptPath, checkpointId, defaultPath); + return rebuildFromRemoteChkp_rsync(key, checkpointPath, checkpointId, defaultPath); } else { stError("%s no remote backup checkpoint data for:%" PRId64, key, checkpointId); } @@ -423,46 +575,70 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) { const char* current = "CURRENT"; size_t currLen = strlen(current); + const char* info = "info"; + size_t infoLen = strlen(info); + int32_t code = 0; int32_t sLen = strlen(src); int32_t dLen = strlen(dst); - char* srcName = taosMemoryCalloc(1, sLen + 64); - char* dstName = taosMemoryCalloc(1, dLen + 64); + int32_t cap = TMAX(sLen, dLen) + 64; + int32_t nBytes = 0; + + char* srcName = taosMemoryCalloc(1, cap); + char* dstName = taosMemoryCalloc(1, cap); + if (srcName == NULL || dstName == NULL) { + taosMemoryFree(srcName); + taosMemoryFree(dstName); + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } // copy file to dst TdDirPtr pDir = taosOpenDir(src); if (pDir == NULL) { - taosMemoryFree(srcName); - taosMemoryFree(dstName); code = TAOS_SYSTEM_ERROR(errno); - - errno = 0; - return code; + goto _ERROR; } errno = 0; TdDirEntryPtr de = NULL; - while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) { continue; } - sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); - sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); + nBytes = snprintf(srcName, cap, "%s%s%s", src, TD_DIRSEP, name); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + + nBytes = snprintf(dstName, cap, "%s%s%s", dst, TD_DIRSEP, name); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) { code = copyFiles_create(srcName, dstName, 0); if (code != 0) { - code = TAOS_SYSTEM_ERROR(code); + code = TAOS_SYSTEM_ERROR(errno); stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code)); goto _ERROR; } + } else if (strncmp(name, info, strlen(name) <= infoLen ? strlen(name) : infoLen) == 0) { + code = copyFiles_create(srcName, dstName, 0); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code)); + goto _ERROR; + } + } else { code = copyFiles_hardlink(srcName, dstName, 0); if (code != 0) { - code = TAOS_SYSTEM_ERROR(code); + code = TAOS_SYSTEM_ERROR(errno); stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code)); goto _ERROR; } else { @@ -470,28 +646,26 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) { } } - memset(srcName, 0, sLen + 64); - memset(dstName, 0, dLen + 64); + memset(srcName, 0, cap); + memset(dstName, 0, cap); } taosMemoryFreeClear(srcName); taosMemoryFreeClear(dstName); taosCloseDir(&pDir); - errno = 0; return code; _ERROR: taosMemoryFreeClear(srcName); taosMemoryFreeClear(dstName); taosCloseDir(&pDir); - errno = 0; return code; } int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); } static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId, - const char* defaultPath) { + const char* defaultPath, int64_t* processVer) { int32_t code = 0; cleanDir(defaultPath, pTaskIdStr); @@ -502,7 +676,7 @@ static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* ch if (code != TSDB_CODE_SUCCESS) { cleanDir(defaultPath, pTaskIdStr); stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote", - pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); + pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(code))); code = TSDB_CODE_SUCCESS; } else { stInfo("%s copy checkpoint data from:%s to:%s succ, try to start stream backend", pTaskIdStr, checkpointPath, @@ -516,41 +690,81 @@ static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* ch return code; } -int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath) { +int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath, + int64_t* processVer) { int32_t code = 0; - char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128); - sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key); + char* prefixPath = NULL; + char* defaultPath = NULL; + char* checkpointPath = NULL; + char* checkpointRoot = NULL; - validateDir(prefixPath); + int32_t cap = strlen(path) + 128; + int32_t nBytes; - char* defaultPath = taosMemoryCalloc(1, strlen(path) + 256); - sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "state"); + // alloc buf + prefixPath = taosMemoryCalloc(1, cap); + defaultPath = taosMemoryCalloc(1, cap); + checkpointPath = taosMemoryCalloc(1, cap); + checkpointRoot = taosMemoryCalloc(1, cap); + if (prefixPath == NULL || defaultPath == NULL || checkpointPath == NULL || checkpointRoot == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } - validateDir(defaultPath); - int32_t pathLen = strlen(path) + 256; + nBytes = snprintf(prefixPath, cap, "%s%s%s", path, TD_DIRSEP, key); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } - char* checkpointRoot = taosMemoryCalloc(1, pathLen); - sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints"); + code = createDirIfNotExist(prefixPath); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; + } - validateDir(checkpointRoot); - taosMemoryFree(checkpointRoot); + nBytes = snprintf(defaultPath, cap, "%s%s%s", prefixPath, TD_DIRSEP, "state"); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } + + code = createDirIfNotExist(defaultPath); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; + } + + nBytes = snprintf(checkpointRoot, cap, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints"); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } + + code = createDirIfNotExist(checkpointRoot); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _EXIT; + } stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId); - - char* chkptPath = taosMemoryCalloc(1, pathLen); if (chkptId > 0) { - snprintf(chkptPath, pathLen, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", - chkptId); + nBytes = snprintf(checkpointPath, cap, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, + "checkpoint", chkptId); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } - code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath); + code = rebuildFromLocalCheckpoint(key, checkpointPath, chkptId, defaultPath, processVer); if (code != 0) { - code = rebuildFromRemoteCheckpoint(key, chkptPath, chkptId, defaultPath); + code = rebuildFromRemoteCheckpoint(key, checkpointPath, chkptId, defaultPath); } if (code != 0) { - stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath, - tstrerror(code), defaultPath); + stError("failed to start stream backend at %s, restart from default defaultPath:%s, reason:%s", checkpointPath, + defaultPath, tstrerror(code)); code = 0; // reset the error code } } else { // no valid checkpoint id @@ -559,21 +773,40 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId cleanDir(defaultPath, key); } - taosMemoryFree(chkptPath); - *dbPath = defaultPath; *dbPrefixPath = prefixPath; + defaultPath = NULL; + prefixPath = NULL; + code = 0; + +_EXIT: + taosMemoryFree(defaultPath); + taosMemoryFree(prefixPath); + taosMemoryFree(checkpointPath); + taosMemoryFree(checkpointRoot); return code; } +bool streamBackendDataIsExist(const char* path, int64_t chkpId) { + bool exist = true; + int32_t cap = strlen(path) + 32; -bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId) { - bool exist = true; - char* state = taosMemoryCalloc(1, strlen(path) + 32); - sprintf(state, "%s%s%s", path, TD_DIRSEP, "state"); - if (!taosDirExist(state)) { - exist = false; + char* state = taosMemoryCalloc(1, cap); + if (state == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return false; } + + int16_t nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state"); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + exist = false; + } else { + if (!taosDirExist(state)) { + exist = false; + } + } + taosMemoryFree(state); return exist; } @@ -1074,12 +1307,14 @@ int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) { if (cp == NULL || err != NULL) { stError("failed to do checkpoint at:%s, reason:%s", path, err); taosMemoryFreeClear(err); + code = TSDB_CODE_THIRDPARTY_ERROR; goto _ERROR; } rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err); if (err != NULL) { stError("failed to do checkpoint at:%s, reason:%s", path, err); taosMemoryFreeClear(err); + code = TSDB_CODE_THIRDPARTY_ERROR; } else { code = 0; } @@ -1093,13 +1328,17 @@ int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32 char* err = NULL; rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + if (flushOpt == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + rocksdb_flushoptions_set_wait(flushOpt, 1); rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err); if (err != NULL) { stError("failed to flush db before streamBackend clean up, reason:%s", err); taosMemoryFree(err); - code = -1; + code = TSDB_CODE_THIRDPARTY_ERROR; } rocksdb_flushoptions_destroy(flushOpt); return code; @@ -1107,31 +1346,51 @@ int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32 int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) { int32_t code = 0; - char* pChkpDir = taosMemoryCalloc(1, 256); - char* pChkpIdDir = taosMemoryCalloc(1, 256); + int32_t cap = strlen(path) + 256; + int32_t nBytes = 0; - sprintf(pChkpDir, "%s%s%s", path, TD_DIRSEP, "checkpoints"); - code = taosMulModeMkDir(pChkpDir, 0755, true); - if (code != 0) { - stError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); - taosMemoryFree(pChkpDir); - taosMemoryFree(pChkpIdDir); - code = -1; - return code; + char* pChkpDir = taosMemoryCalloc(1, cap); + char* pChkpIdDir = taosMemoryCalloc(1, cap); + if (pChkpDir == NULL || pChkpIdDir == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } + + nBytes = snprintf(pChkpDir, cap, "%s%s%s", path, TD_DIRSEP, "checkpoints"); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } + + nBytes = snprintf(pChkpIdDir, cap, "%s%s%s%" PRId64, pChkpDir, TD_DIRSEP, "checkpoint", chkpId); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _EXIT; + } + + code = taosMulModeMkDir(pChkpDir, 0755, true); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); + goto _EXIT; } - sprintf(pChkpIdDir, "%s%s%s%" PRId64, pChkpDir, TD_DIRSEP, "checkpoint", chkpId); if (taosIsDir(pChkpIdDir)) { stInfo("stream rm exist checkpoint%s", pChkpIdDir); taosRemoveDir(pChkpIdDir); } + *chkpDir = pChkpDir; *chkpIdDir = pChkpIdDir; - return 0; +_EXIT: + taosMemoryFree(pChkpDir); + taosMemoryFree(pChkpIdDir); + return code; } int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { + // vnode task->db SStreamMeta* pMeta = arg; taosThreadMutexLock(&pMeta->backendMutex); @@ -1140,27 +1399,42 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { while (pIter) { STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter; - taskDbAddRef(pTaskDb); - int64_t chkpId = pTaskDb->chkpId; - taskDbRefChkp(pTaskDb, chkpId); - code = taskDbDoCheckpoint(pTaskDb, chkpId); - if (code != 0) { - taskDbUnRefChkp(pTaskDb, chkpId); + void* p = taskDbAddRef(pTaskDb); + if (p == NULL) { + terrno = 0; + pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter); + continue; } - taskDbRemoveRef(pTaskDb); + // add chkpId to in-use-ckpkIdSet + taskDbRefChkp(pTaskDb, pTaskDb->chkpId); + + code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId, ((SStreamTask*)pTaskDb->pTask)->chkInfo.processedVer); + if (code != 0) { + // remove chkpId from in-use-ckpkIdSet + taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId); + taskDbRemoveRef(pTaskDb); + break; + } SStreamTask* pTask = pTaskDb->pTask; SStreamTaskSnap snap = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .chkpId = pTaskDb->chkpId, .dbPrefixPath = taosStrdup(pTaskDb->path)}; + if (snap.dbPrefixPath == NULL) { + // remove chkpid from chkp-in-use set + taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId); + taskDbRemoveRef(pTaskDb); + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } taosArrayPush(pSnap, &snap); + pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter); } taosThreadMutexUnlock(&pMeta->backendMutex); - return code; } int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) { @@ -1232,20 +1506,131 @@ int64_t taskGetDBRef(void* arg) { return pDb->refId; } -int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { +int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) { + TdFilePtr pFile = NULL; + int32_t code = 0; + + char buf[256] = {0}; + int32_t nBytes = 0; + + int32_t len = strlen(pChkpIdDir); + if (len == 0) { + code = TSDB_CODE_INVALID_PARA; + stError("failed to load extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code)); + return code; + } + + int32_t cap = len + 64; + char* pDst = taosMemoryCalloc(1, cap); + if (pDst == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + stError("failed to alloc memory to load extra info, dir:%s", pChkpIdDir); + goto _EXIT; + } + + nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + stError("failed to build dst to load extra info, dir:%s", pChkpIdDir); + goto _EXIT; + } + + pFile = taosOpenFile(pDst, TD_FILE_READ); + if (pFile == NULL) { + // compatible with previous version + *processId = -1; + code = 0; + stError("failed to open file to load extra info, file:%s, reason:%s", pDst, tstrerror(TAOS_SYSTEM_ERROR(errno))); + goto _EXIT; + } + + if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(code)); + goto _EXIT; + } + + if (sscanf(buf, "%" PRId64 " %" PRId64 "", chkpId, processId) < 2) { + code = TSDB_CODE_INVALID_PARA; + stError("failed to read file content to load extra info, file:%s, reason:%s", pDst, tstrerror(code)); + goto _EXIT; + } + code = 0; +_EXIT: + taosMemoryFree(pDst); + taosCloseFile(&pFile); + return code; +} +int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { + int32_t code = 0; + + TdFilePtr pFile = NULL; + + char buf[256] = {0}; + int32_t nBytes = 0; + + int32_t len = strlen(pChkpIdDir); + if (len == 0) { + code = TSDB_CODE_INVALID_PARA; + stError("failed to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code)); + return code; + } + + int32_t cap = len + 64; + char* pDst = taosMemoryCalloc(1, cap); + if (pDst == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + stError("failed to alloc memory to add extra info, dir:%s", pChkpIdDir); + goto _EXIT; + } + + nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + stError("failed to build dst to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(code)); + goto _EXIT; + } + + pFile = taosOpenFile(pDst, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to open file to add extra info, file:%s, reason:%s", pDst, tstrerror(code)); + goto _EXIT; + } + + nBytes = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId); + if (nBytes <= 0 || nBytes >= sizeof(buf)) { + code = TSDB_CODE_OUT_OF_RANGE; + stError("failed to build content to add extra info, dir:%s,reason:%s", pChkpIdDir, tstrerror(code)); + goto _EXIT; + } + + if (nBytes != taosWriteFile(pFile, buf, nBytes)) { + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to write file to add extra info, file:%s, reason:%s", pDst, tstrerror(code)); + goto _EXIT; + } + code = 0; + +_EXIT: + taosCloseFile(&pFile); + taosMemoryFree(pDst); + return code; +} +int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId) { STaskDbWrapper* pTaskDb = arg; int64_t st = taosGetTimestampMs(); - int32_t code = -1; + int32_t code = 0; int64_t refId = pTaskDb->refId; if (taosAcquireRef(taskDbWrapperId, refId) == NULL) { - return -1; + code = terrno; + return code; } char* pChkpDir = NULL; char* pChkpIdDir = NULL; - if (chkpPreBuildDir(pTaskDb->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) { - code = -1; + if ((code = chkpPreBuildDir(pTaskDb->path, chkpId, &pChkpDir, &pChkpIdDir)) < 0) { goto _EXIT; } // Get all cf and acquire cfWrappter @@ -1256,32 +1641,58 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { int64_t written = atomic_load_64(&pTaskDb->dataWritten); + // flush db if (written > 0) { stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written); code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf); + if (code != 0) goto _EXIT; } else { stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written); } + + // do checkpoint if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) { stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir); + goto _EXIT; } else { stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir, taosGetTimestampMs() - st); } + // add extra info to checkpoint + if ((code = chkpAddExtraInfo(pChkpIdDir, chkpId, processId)) != 0) { + stError("stream backend:%p failed to add extra info to checkpoint at:%s", pTaskDb, pChkpIdDir); + goto _EXIT; + } + + // delete ttl checkpoint code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir); + if (code < 0) { + goto _EXIT; + } + atomic_store_64(&pTaskDb->dataWritten, 0); pTaskDb->chkpId = chkpId; _EXIT: - taosMemoryFree(pChkpDir); + + // clear checkpoint dir if failed + if (code != 0 && pChkpDir != NULL) { + if (taosDirExist(pChkpIdDir)) { + taosRemoveDir(pChkpIdDir); + } + } taosMemoryFree(pChkpIdDir); + taosMemoryFree(pChkpDir); + taosReleaseRef(taskDbWrapperId, refId); taosMemoryFree(ppCf); return code; } -int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId) { return taskDbDoCheckpoint(arg, chkpId); } +int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId, int64_t processVer) { + return taskDbDoCheckpoint(arg, chkpId, processVer); +} SListNode* streamBackendAddCompare(void* backend, void* arg) { SBackendWrapper* pHandle = (SBackendWrapper*)backend; @@ -1659,7 +2070,9 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { len += taosEncodeFixedI32((void**)&buf, key.len); len += taosEncodeFixedI32((void**)&buf, key.rawLen); len += taosEncodeFixedI8((void**)&buf, key.compress); - len += taosEncodeBinary((void**)&buf, (char*)value, key.len); + if (value != NULL && key.len != 0) { + len += taosEncodeBinary((void**)&buf, (char*)value, key.len); + } *dest = p; } else { char* buf = *dest; @@ -1667,7 +2080,9 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) { len += taosEncodeFixedI32((void**)&buf, key.len); len += taosEncodeFixedI32((void**)&buf, key.rawLen); len += taosEncodeFixedI8((void**)&buf, key.compress); - len += taosEncodeBinary((void**)&buf, (char*)value, key.len); + if (value != NULL && key.len != 0) { + len += taosEncodeBinary((void**)&buf, (char*)value, key.len); + } } taosMemoryFree(dst); @@ -2005,12 +2420,16 @@ void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) { int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) { int32_t code = 0; + char* statePath = taosMemoryCalloc(1, strlen(path) + 128); + if (statePath == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } - char* statePath = taosMemoryCalloc(1, strlen(path) + 128); sprintf(statePath, "%s%s%s", path, TD_DIRSEP, key); if (!taosDirExist(statePath)) { code = taosMulMkDir(statePath); if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); stError("failed to create dir: %s, reason:%s", statePath, tstrerror(code)); taosMemoryFree(statePath); return code; @@ -2018,10 +2437,16 @@ int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** sta } char* dbPath = taosMemoryCalloc(1, strlen(statePath) + 128); + if (dbPath == NULL) { + taosMemoryFree(statePath); + return TSDB_CODE_OUT_OF_MEMORY; + } + sprintf(dbPath, "%s%s%s", statePath, TD_DIRSEP, "state"); if (!taosDirExist(dbPath)) { code = taosMulMkDir(dbPath); if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); stError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code)); taosMemoryFree(statePath); taosMemoryFree(dbPath); @@ -2099,15 +2524,32 @@ _EXIT: return NULL; } -STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId) { +STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer) { char* statePath = NULL; char* dbPath = NULL; - - if (restoreCheckpointData(path, key, chkptId, &statePath, &dbPath) != 0) { + int code = 0; + terrno = 0; + if ((code = restoreCheckpointData(path, key, chkptId, &statePath, &dbPath, processVer)) < 0) { + terrno = code; + stError("failed to restore checkpoint data, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, + chkptId, tstrerror(terrno)); return NULL; } STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath); + if (pTaskDb != NULL) { + int64_t chkpId = -1, ver = -1; + if ((code = chkpLoadExtraInfo(dbPath, &chkpId, &ver) == 0)) { + *processVer = ver; + } else { + terrno = code; + stError("failed to load extra info, path:%s, key:%s, checkpointId: %" PRId64 "reason:%s", path, key, chkptId, + tstrerror(terrno)); + taskDbDestroy(pTaskDb, false); + return NULL; + } + } + taosMemoryFree(dbPath); taosMemoryFree(statePath); return pTaskDb; @@ -2194,15 +2636,31 @@ void taskDbDestroy(void* pDb, bool flush) { void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); } int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) { - int32_t code = -1; + int32_t code = 0; int64_t refId = pDb->refId; + int32_t nBytes = 0; if (taosAcquireRef(taskDbWrapperId, refId) == NULL) { - return -1; + code = terrno; + return code; + } + + int32_t cap = strlen(pDb->path) + 128; + + char* buf = taosMemoryCalloc(1, cap); + if (buf == NULL) { + taosReleaseRef(taskDbWrapperId, refId); + return TSDB_CODE_OUT_OF_MEMORY; + } + + nBytes = + snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); + if (nBytes <= 0 || nBytes >= cap) { + taosMemoryFree(buf); + taosReleaseRef(taskDbWrapperId, refId); + return TSDB_CODE_OUT_OF_RANGE; } - char* buf = taosMemoryCalloc(1, strlen(pDb->path) + 128); - sprintf(buf, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); if (taosIsDir(buf)) { code = 0; *path = buf; @@ -2217,15 +2675,28 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list, const char* idStr) { int32_t code = 0; + int32_t cap = strlen(pDb->path) + 32; SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; - char* temp = taosMemoryCalloc(1, strlen(pDb->path) + 32); - sprintf(temp, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId); + char* temp = taosMemoryCalloc(1, cap); + if (temp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t nBytes = snprintf(temp, cap, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId); + if (nBytes <= 0 || nBytes >= cap) { + taosMemoryFree(temp); + return TSDB_CODE_OUT_OF_RANGE; + } if (taosDirExist(temp)) { cleanDir(temp, idStr); } else { - taosMkDir(temp); + code = taosMkDir(temp); + if (code != 0) { + taosMemoryFree(temp); + return TAOS_SYSTEM_ERROR(errno); + } } code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp); @@ -2321,7 +2792,8 @@ int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) { int32_t code = 0; - STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0); + int64_t processVer = -1; + STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0, &processVer); RocksdbCfInst* pSrcBackend = pCfInst; for (int i = 0; i < nCf; i++) { @@ -3955,6 +4427,9 @@ int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { char* name = taosHashGetKey(pIter, &len); if (!isBkdDataMeta(name, len) && !taosHashGet(p1, name, len)) { char* fname = taosMemoryCalloc(1, len + 1); + if (fname == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } strncpy(fname, name, len); taosArrayPush(diff, &fname); } @@ -3966,7 +4441,9 @@ int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) { int32_t code = 0; code = compareHashTableImpl(p1, p2, add); - code = compareHashTableImpl(p2, p1, del); + if (code != 0) { + code = compareHashTableImpl(p2, p1, del); + } return code; } @@ -4007,26 +4484,29 @@ void strArrayDebugInfo(SArray* pArr, char** buf) { *buf = p; } void dbChkpDebugInfo(SDbChkp* pDb) { - // stTrace("chkp get file list: curr"); - char* p[4] = {NULL}; + if (stDebugFlag & DEBUG_INFO) { + char* p[4] = {NULL}; - hashTableToDebug(pDb->pSstTbl[pDb->idx], &p[0]); - stTrace("chkp previous file: [%s]", p[0]); + hashTableToDebug(pDb->pSstTbl[pDb->idx], &p[0]); + stTrace("chkp previous file: [%s]", p[0]); - hashTableToDebug(pDb->pSstTbl[1 - pDb->idx], &p[1]); - stTrace("chkp curr file: [%s]", p[1]); + hashTableToDebug(pDb->pSstTbl[1 - pDb->idx], &p[1]); + stTrace("chkp curr file: [%s]", p[1]); - strArrayDebugInfo(pDb->pAdd, &p[2]); - stTrace("chkp newly addded file: [%s]", p[2]); + strArrayDebugInfo(pDb->pAdd, &p[2]); + stTrace("chkp newly addded file: [%s]", p[2]); - strArrayDebugInfo(pDb->pDel, &p[3]); - stTrace("chkp newly deleted file: [%s]", p[3]); + strArrayDebugInfo(pDb->pDel, &p[3]); + stTrace("chkp newly deleted file: [%s]", p[3]); - for (int i = 0; i < 4; i++) { - taosMemoryFree(p[i]); + for (int i = 0; i < 4; i++) { + taosMemoryFree(p[i]); + } } } int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { + int32_t code = 0; + int32_t nBytes; taosThreadRwlockWrlock(&p->rwLock); p->preCkptId = p->curChkpId; @@ -4041,13 +4521,24 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { int32_t sstLen = strlen(pSST); memset(p->buf, 0, p->len); - sprintf(p->buf, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); + + nBytes = + snprintf(p->buf, p->len, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); + if (nBytes <= 0 || nBytes >= p->len) { + taosThreadRwlockUnlock(&p->rwLock); + return TSDB_CODE_OUT_OF_RANGE; + } taosArrayClearP(p->pAdd, taosMemoryFree); taosArrayClearP(p->pDel, taosMemoryFree); taosHashClear(p->pSstTbl[1 - p->idx]); - TdDirPtr pDir = taosOpenDir(p->buf); + TdDirPtr pDir = taosOpenDir(p->buf); + if (pDir == NULL) { + taosThreadRwlockUnlock(&p->rwLock); + return TAOS_SYSTEM_ERROR(errno); + } + TdDirEntryPtr de = NULL; int8_t dummy = 0; while ((de = taosReadDir(pDir)) != NULL) { @@ -4055,23 +4546,36 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { taosMemoryFreeClear(p->pCurrent); + p->pCurrent = taosStrdup(name); - // taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); + if (p->pCurrent == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } continue; } if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { taosMemoryFreeClear(p->pManifest); p->pManifest = taosStrdup(name); - // taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); + if (p->pManifest == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } continue; } if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { - taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); + if (taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)) != 0) { + break; + } continue; } } taosCloseDir(&pDir); + if (code != 0) { + taosThreadRwlockUnlock(&p->rwLock); + return code; + } if (p->init == 0) { void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL); @@ -4080,6 +4584,11 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { char* name = taosHashGetKey(pIter, &len); if (name != NULL && !isBkdDataMeta(name, len)) { char* fname = taosMemoryCalloc(1, len + 1); + if (fname == NULL) { + taosThreadRwlockUnlock(&p->rwLock); + return TSDB_CODE_OUT_OF_MEMORY; + } + strncpy(fname, name, len); taosArrayPush(p->pAdd, &fname); } @@ -4115,34 +4624,78 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { taosThreadRwlockUnlock(&p->rwLock); - return 0; + return code; } +void dbChkpDestroy(SDbChkp* pChkp); + SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp)); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } + p->curChkpId = initChkpId; p->preCkptId = -1; p->pSST = taosArrayInit(64, sizeof(void*)); + if (p->pSST == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + dbChkpDestroy(p); + return NULL; + } + p->path = path; p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); + if (p->buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } p->idx = 0; p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (p->pSstTbl[0] == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } + p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (p->pSstTbl[1] == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } p->pAdd = taosArrayInit(64, sizeof(void*)); + if (p->pAdd == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } + p->pDel = taosArrayInit(64, sizeof(void*)); + if (p->pDel == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _EXIT; + } + p->update = 0; taosThreadRwlockInit(&p->rwLock, NULL); SArray* list = NULL; int32_t code = dbChkpGetDelta(p, initChkpId, list); + if (code != 0) { + goto _EXIT; + } return p; +_EXIT: + dbChkpDestroy(p); + return NULL; } void dbChkpDestroy(SDbChkp* pChkp) { + if (pChkp == NULL) return; + taosMemoryFree(pChkp->buf); taosMemoryFree(pChkp->path); @@ -4164,35 +4717,71 @@ int32_t dbChkpInit(SDbChkp* p) { } #endif int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { + static char* chkpMeta = "META"; + int32_t code = 0; + taosThreadRwlockRdlock(&p->rwLock); - int32_t code = -1; - int32_t len = p->len + 128; - char* srcBuf = taosMemoryCalloc(1, len); - char* dstBuf = taosMemoryCalloc(1, len); + int32_t cap = p->len + 128; - char* srcDir = taosMemoryCalloc(1, len); - char* dstDir = taosMemoryCalloc(1, len); + char* buffer = taosMemoryCalloc(4, cap); + if (buffer == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _ERROR; + } - sprintf(srcDir, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", p->curChkpId); - sprintf(dstDir, "%s", dname); + char* srcBuf = buffer; + char* dstBuf = &srcBuf[cap]; + char* srcDir = &dstBuf[cap]; + char* dstDir = &srcDir[cap]; + + int nBytes = snprintf(srcDir, cap, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, + "checkpoint", p->curChkpId); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + + nBytes = snprintf(dstDir, cap, "%s", dname); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } if (!taosDirExist(srcDir)) { stError("failed to dump srcDir %s, reason: not exist such dir", srcDir); + code = TSDB_CODE_INVALID_PARA; + goto _ERROR; + } + int64_t chkpId = 0, processId = -1; + code = chkpLoadExtraInfo(srcDir, &chkpId, &processId); + if (code < 0) { + stError("failed to load extra info from %s, reason:%s", srcDir, code != 0 ? "unkown" : tstrerror(code)); + goto _ERROR; } // add file to $name dir for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) { - memset(srcBuf, 0, len); - memset(dstBuf, 0, len); + memset(srcBuf, 0, cap); + memset(dstBuf, 0, cap); char* filename = taosArrayGetP(p->pAdd, i); - sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); + nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, filename); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + + nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, filename); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } if (taosCopyFile(srcBuf, dstBuf) < 0) { - stError("failed to copy file from %s to %s", srcBuf, dstBuf); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code)); goto _ERROR; } } @@ -4200,44 +4789,84 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { for (int i = 0; i < taosArrayGetSize(p->pDel); i++) { char* filename = taosArrayGetP(p->pDel, i); char* p = taosStrdup(filename); + if (p == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _ERROR; + } taosArrayPush(list, &p); } // copy current file to dst dir - memset(srcBuf, 0, len); - memset(dstBuf, 0, len); - sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent); - sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId); + memset(srcBuf, 0, cap); + memset(dstBuf, 0, cap); + + nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + + nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + if (taosCopyFile(srcBuf, dstBuf) < 0) { - stError("failed to copy file from %s to %s", srcBuf, dstBuf); + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code)); goto _ERROR; } // copy manifest file to dst dir - memset(srcBuf, 0, len); - memset(dstBuf, 0, len); - sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest); - sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId); - if (taosCopyFile(srcBuf, dstBuf) < 0) { - stError("failed to copy file from %s to %s", srcBuf, dstBuf); + memset(srcBuf, 0, cap); + memset(dstBuf, 0, cap); + + nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; goto _ERROR; } - static char* chkpMeta = "META"; - memset(dstBuf, 0, len); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta); - memcpy(dstDir, dstBuf, strlen(dstBuf)); + nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + + if (taosCopyFile(srcBuf, dstBuf) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(code)); + goto _ERROR; + } + memset(dstBuf, 0, cap); + nBytes = snprintf(dstDir, cap, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } TdFilePtr pFile = taosOpenFile(dstDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { - stError("chkp failed to create meta file: %s", dstDir); + code = TAOS_SYSTEM_ERROR(errno); + stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(code)); goto _ERROR; } - char content[128] = {0}; - snprintf(content, sizeof(content), "%s_%" PRId64 "\n%s_%" PRId64 "", p->pCurrent, p->curChkpId, p->pManifest, - p->curChkpId); - if (taosWriteFile(pFile, content, strlen(content)) <= 0) { - stError("chkp failed to write meta file: %s", dstDir); + + char content[256] = {0}; + nBytes = snprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest, p->curChkpId, + "processVer", processId); + if (nBytes <= 0 || nBytes >= sizeof(content)) { + code = TSDB_CODE_OUT_OF_RANGE; + stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir); + taosCloseFile(&pFile); + goto _ERROR; + } + + nBytes = taosWriteFile(pFile, content, strlen(content)); + if (nBytes != strlen(content)) { + code = TAOS_SYSTEM_ERROR(errno); + stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(code)); taosCloseFile(&pFile); goto _ERROR; } @@ -4249,18 +4878,39 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { code = 0; _ERROR: + taosMemoryFree(buffer); taosThreadRwlockUnlock(&p->rwLock); - taosMemoryFree(srcBuf); - taosMemoryFree(dstBuf); - taosMemoryFree(srcDir); - taosMemoryFree(dstDir); return code; } + SBkdMgt* bkdMgtCreate(char* path) { + terrno = 0; SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt)); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (p->pDbChkpTbl == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + bkdMgtDestroy(p); + return NULL; + } + p->path = taosStrdup(path); - taosThreadRwlockInit(&p->rwLock, NULL); + if (p->path == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + bkdMgtDestroy(p); + return NULL; + } + + if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + bkdMgtDestroy(p); + return NULL; + } + return p; } @@ -4282,28 +4932,52 @@ void bkdMgtDestroy(SBkdMgt* bm) { } int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* dname) { int32_t code = 0; - taosThreadRwlockWrlock(&bm->rwLock); SDbChkp** ppChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); SDbChkp* pChkp = ppChkp != NULL ? *ppChkp : NULL; if (pChkp == NULL) { - char* path = taosMemoryCalloc(1, strlen(bm->path) + 64); - sprintf(path, "%s%s%s", bm->path, TD_DIRSEP, taskId); + int32_t cap = strlen(bm->path) + 64; + char* path = taosMemoryCalloc(1, cap); + if (path == NULL) { + taosThreadRwlockUnlock(&bm->rwLock); + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t nBytes = snprintf(path, cap, "%s%s%s", bm->path, TD_DIRSEP, taskId); + if (nBytes <= 0 || nBytes >= cap) { + taosMemoryFree(path); + taosThreadRwlockUnlock(&bm->rwLock); + code = TSDB_CODE_OUT_OF_RANGE; + return code; + } SDbChkp* p = dbChkpCreate(path, chkpId); - taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)); + if (p == NULL) { + taosMemoryFree(path); + taosThreadRwlockUnlock(&bm->rwLock); + code = terrno; + return code; + } + + if (taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)) != 0) { + dbChkpDestroy(p); + taosThreadRwlockUnlock(&bm->rwLock); + code = terrno; + return code; + } pChkp = p; - code = dbChkpDumpTo(pChkp, dname, list); taosThreadRwlockUnlock(&bm->rwLock); return code; + } else { + code = dbChkpGetDelta(pChkp, chkpId, NULL); + if (code == 0) { + code = dbChkpDumpTo(pChkp, dname, list); + } } - code = dbChkpGetDelta(pChkp, chkpId, NULL); - code = dbChkpDumpTo(pChkp, dname, list); - taosThreadRwlockUnlock(&bm->rwLock); return code; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index cdb5bf0b50..a3a7035905 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -57,6 +57,13 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint pBlock->info.childId = pTask->info.selfChildId; pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock; + if (pChkpoint->blocks == NULL) { + taosMemoryFree(pBlock); + taosFreeQitem(pChkpoint); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + taosArrayPush(pChkpoint->blocks, pBlock); taosMemoryFree(pBlock); @@ -112,7 +119,12 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pRpcInfo, int32_t code) { int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); - void* pBuf = rpcMallocCont(size); + + void* pBuf = rpcMallocCont(size); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); @@ -133,6 +145,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); + return 0; } @@ -522,65 +535,57 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { } static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { - char buf[128] = {0}; + int32_t code = 0; + int32_t cap = strlen(path) + 64; - char* file = taosMemoryCalloc(1, strlen(path) + 32); - sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP"); + char* filePath = taosMemoryCalloc(1, cap); + if (filePath == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } - int32_t code = downloadCheckpointDataByName(id, "META", file); + int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP"); + if (nBytes <= 0 || nBytes >= cap) { + taosMemoryFree(filePath); + return TSDB_CODE_OUT_OF_RANGE; + } + + code = downloadCheckpointDataByName(id, "META", filePath); if (code != 0) { - stDebug("%s chkp failed to download meta file:%s", id, file); - taosMemoryFree(file); + stError("%s chkp failed to download meta file:%s", id, filePath); + taosMemoryFree(filePath); return code; } - TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); - if (pFile == NULL) { - stError("%s failed to open meta file:%s for checkpoint", id, file); - code = -1; - return code; + code = remoteChkpGetDelFile(filePath, list); + if (code != 0) { + stError("%s chkp failed to get to del:%s", id, filePath); + taosMemoryFree(filePath); } - - if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { - stError("%s failed to read meta file:%s for checkpoint", id, file); - code = -1; - } else { - int32_t len = strnlen(buf, tListLen(buf)); - for (int i = 0; i < len; i++) { - if (buf[i] == '\n') { - char* item = taosMemoryCalloc(1, i + 1); - memcpy(item, buf, i); - taosArrayPush(list, &item); - - item = taosMemoryCalloc(1, len - i); - memcpy(item, buf + i + 1, len - i - 1); - taosArrayPush(list, &item); - } - } - } - - taosCloseFile(&pFile); - taosRemoveFile(file); - taosMemoryFree(file); - return code; + return 0; } int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) { - char* path = NULL; - int32_t code = 0; - SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); - int64_t now = taosGetTimestampMs(); + int32_t code = 0; + char* path = NULL; + SStreamMeta* pMeta = pTask->pMeta; const char* idStr = pTask->id.idStr; + int64_t now = taosGetTimestampMs(); + + SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); + if (toDelFiles == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles, pTask->id.idStr)) != 0) { - stError("s-task:%s failed to gen upload checkpoint:%" PRId64, idStr, checkpointId); + stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(code)); } if (type == DATA_UPLOAD_S3) { if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) { - stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", idStr, checkpointId); + stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId, + tstrerror(code)); } } @@ -589,7 +594,8 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d if (code == TSDB_CODE_SUCCESS) { stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId); } else { - stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", idStr, checkpointId, path); + stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path, + tstrerror(code)); } } @@ -662,7 +668,8 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (pTask->info.taskLevel != TASK_LEVEL__SINK) { stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId); - code = streamBackendDoCheckpoint(pTask->pBackend, ckId); + int64_t ver = pTask->chkInfo.processedVer; + code = streamBackendDoCheckpoint(pTask->pBackend, ckId, ver); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno)); } @@ -770,6 +777,11 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { SArray* pList = pTask->upstreamInfo.pList; ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE); SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); + if (pNotSendList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); + return; + } for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i); @@ -976,52 +988,77 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { } static int32_t uploadCheckpointToS3(const char* id, const char* path) { + int32_t code = 0; + int32_t nBytes = 0; + + if (s3Init() != 0) { + return TSDB_CODE_THIRDPARTY_ERROR; + } + TdDirPtr pDir = taosOpenDir(path); - if (pDir == NULL) return -1; + if (pDir == NULL) { + return TAOS_SYSTEM_ERROR(errno); + } TdDirEntryPtr de = NULL; - s3Init(); while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue; char filename[PATH_MAX] = {0}; if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) { - snprintf(filename, sizeof(filename), "%s%s", path, name); + nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name); + if (nBytes <= 0 || nBytes >= sizeof(filename)) { + code = TSDB_CODE_OUT_OF_RANGE; + break; + } } else { - snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name); + nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name); + if (nBytes <= 0 || nBytes >= sizeof(filename)) { + code = TSDB_CODE_OUT_OF_RANGE; + break; + } } char object[PATH_MAX] = {0}; - snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); - - if (s3PutObjectFromFile2(filename, object, 0) != 0) { - taosCloseDir(&pDir); - return -1; + nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); + if (nBytes <= 0 || nBytes >= sizeof(object)) { + code = TSDB_CODE_OUT_OF_RANGE; + break; } - stDebug("[s3] upload checkpoint:%s", filename); - // break; - } + code = s3PutObjectFromFile2(filename, object, 0); + if (code != 0) { + stError("[s3] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code)); + } else { + stDebug("[s3] upload checkpoint:%s", filename); + } + } taosCloseDir(&pDir); - return 0; + return code; } int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) { - int32_t code = 0; - char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); + int32_t nBytes; + int32_t cap = strlen(id) + strlen(dstName) + 16; + + char* buf = taosMemoryCalloc(1, cap); if (buf == NULL) { - code = terrno = TSDB_CODE_OUT_OF_MEMORY; - return code; + return TSDB_CODE_OUT_OF_MEMORY; } - sprintf(buf, "%s/%s", id, fname); - if (s3GetObjectToFile(buf, dstName) != 0) { - code = errno; + nBytes = snprintf(buf, cap, "%s/%s", id, fname); + if (nBytes <= 0 || nBytes >= cap) { + taosMemoryFree(buf); + return TSDB_CODE_OUT_OF_RANGE; + } + int32_t code = s3GetObjectToFile(buf, dstName); + if (code != 0) { + taosMemoryFree(buf); + return TAOS_SYSTEM_ERROR(errno); } - taosMemoryFree(buf); - return code; + return 0; } ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() { @@ -1035,13 +1072,17 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() { } int32_t streamTaskUploadCheckpoint(const char* id, const char* path) { + int32_t code = 0; if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("invalid parameters in upload checkpoint, %s", id); - return -1; + return TSDB_CODE_INVALID_CFG; } if (strlen(tsSnodeAddress) != 0) { - return uploadByRsync(id, path); + code = uploadByRsync(id, path); + if (code != 0) { + return TAOS_SYSTEM_ERROR(errno); + } } else if (tsS3StreamEnabled) { return uploadCheckpointToS3(id, path); } @@ -1053,7 +1094,7 @@ int32_t streamTaskUploadCheckpoint(const char* id, const char* path) { int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { stError("down load checkpoint data parameters invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } if (strlen(tsSnodeAddress) != 0) { @@ -1083,7 +1124,7 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path) { int32_t deleteCheckpoint(const char* id) { if (id == NULL || strlen(id) == 0) { stError("deleteCheckpoint parameters invalid"); - return -1; + return TSDB_CODE_INVALID_PARA; } if (strlen(tsSnodeAddress) != 0) { return deleteRsync(id); @@ -1095,11 +1136,18 @@ int32_t deleteCheckpoint(const char* id) { int32_t deleteCheckpointFile(const char* id, const char* name) { char object[128] = {0}; - snprintf(object, sizeof(object), "%s/%s", id, name); - char* tmp = object; - s3DeleteObjects((const char**)&tmp, 1); - return 0; + int32_t nBytes = snprintf(object, sizeof(object), "%s/%s", id, name); + if (nBytes <= 0 || nBytes >= sizeof(object)) { + return TSDB_CODE_OUT_OF_RANGE; + } + + char* tmp = object; + int32_t code = s3DeleteObjects((const char**)&tmp, 1); + if (code != 0) { + return TSDB_CODE_THIRDPARTY_ERROR; + } + return code; } int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { @@ -1134,14 +1182,14 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code); if (code < 0) { stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code)); - return -1; + return TSDB_CODE_INVALID_MSG; } void* buf = rpcMallocCont(tlen); if (buf == NULL) { stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } SEncoder encoder; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e7fdb7ae2a..fa3a3ea07d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -182,9 +182,10 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) { int32_t code = 0; int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta); - - bool exist = streamBackendDataIsExist(pMeta->path, chkpId, pMeta->vgId); + terrno = 0; + bool exist = streamBackendDataIsExist(pMeta->path, chkpId); if (exist == false) { + code = terrno; return code; } @@ -252,8 +253,9 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) } STaskDbWrapper* pBackend = NULL; + int64_t processVer = -1; while (1) { - pBackend = taskDbOpen(pMeta->path, key, chkpId); + pBackend = taskDbOpen(pMeta->path, key, chkpId, &processVer); if (pBackend != NULL) { break; } @@ -271,6 +273,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) pBackend->pTask = pTask; pBackend->pMeta = pMeta; + if (processVer != -1) pTask->chkInfo.processedVer = processVer; + taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); taosThreadMutexUnlock(&pMeta->backendMutex); @@ -308,7 +312,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas } if (streamMetaMayCvtDbFormat(pMeta) < 0) { - stError("vgId:%d convert sub info format failed, open stream meta failed", pMeta->vgId); + stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId, + tstrerror(terrno)); goto _err; } @@ -393,6 +398,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); pMeta->bkdChkptMgt = bkdMgtCreate(tpath); + if (pMeta->bkdChkptMgt == NULL) { + goto _err; + } taosThreadMutexInit(&pMeta->backendMutex, NULL); return pMeta; @@ -408,9 +416,10 @@ _err: if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks); if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet); + if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt); taosMemoryFree(pMeta); - stError("failed to open stream meta"); + stError("failed to open stream meta, reason:%s", tstrerror(terrno)); return NULL; } @@ -900,7 +909,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (p == NULL) { code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); if (code < 0) { - stError("failed to load s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); + stError("failed to load s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno)); tFreeStreamTask(pTask); continue; } @@ -985,7 +994,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount); stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", - vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount); + vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount); // wait for the stream meta hb function stopping streamMetaWaitForHbTmrQuit(pMeta); @@ -1171,7 +1180,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int64_t now = taosGetTimestampMs(); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%"PRId64, vgId, numOfTasks, now); + stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now); if (numOfTasks == 0) { stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 0871ff5eb7..02e4ed8d8b 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -24,6 +24,7 @@ enum SBackendFileType { ROCKSDB_SST_TYPE = 3, ROCKSDB_CURRENT_TYPE = 4, ROCKSDB_CHECKPOINT_META_TYPE = 5, + ROCKSDB_CHECKPOINT_SELFCHECK_TYPE = 6, }; typedef struct SBackendFileItem { @@ -49,6 +50,7 @@ typedef struct SBackendSnapFiles2 { char* pOptions; SArray* pSst; char* pCheckpointMeta; + char* pCheckpointSelfcheck; char* path; int64_t checkpointId; @@ -111,6 +113,7 @@ const char* ROCKSDB_MAINFEST = "MANIFEST"; const char* ROCKSDB_SST = "sst"; const char* ROCKSDB_CURRENT = "CURRENT"; const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT"; +const char* ROCKSDB_CHECKPOINT_SELF_CHECK = "info"; static int64_t kBlockSize = 64 * 1024; int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, void* pMeta); @@ -127,6 +130,7 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) { int32_t ret = 0; char* fullname = taosMemoryCalloc(1, strlen(path) + 32); + sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name); ret = taosStatFile(fullname, sz, NULL, NULL); @@ -148,8 +152,20 @@ int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDest void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (qDebugFlag & DEBUG_DEBUG) { - char* buf = taosMemoryCalloc(1, 512); - sprintf(buf + strlen(buf), "["); + int16_t cap = 512; + + char* buf = taosMemoryCalloc(1, cap); + if (buf == NULL) { + stError("%s failed to alloc memory, reason:%s", STREAM_STATE_TRANSFER, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + + int32_t nBytes = snprintf(buf + strlen(buf), cap, "["); + if (nBytes <= 0 || nBytes >= cap) { + taosMemoryFree(buf); + stError("%s failed to write buf, reason:%s", STREAM_STATE_TRANSFER, tstrerror(TSDB_CODE_OUT_OF_RANGE)); + return; + } if (pSnapFile->pCurrent) sprintf(buf, "current: %s,", pSnapFile->pCurrent); if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest); @@ -157,10 +173,10 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (pSnapFile->pSst) { for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { char* name = taosArrayGetP(pSnapFile->pSst, i); - sprintf(buf + strlen(buf), "%s,", name); + if (strlen(buf) + strlen(name) < cap) sprintf(buf + strlen(buf), "%s,", name); } } - sprintf(buf + strlen(buf) - 1, "]"); + if ((strlen(buf)) < cap) sprintf(buf + strlen(buf) - 1, "]"); stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId, pSnapFile->snapInfo.taskId, buf); @@ -199,16 +215,25 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) { // meta item.name = pSnapFile->pCheckpointMeta; item.type = ROCKSDB_CHECKPOINT_META_TYPE; + if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) { + taosArrayPush(pSnapFile->pFileList, &item); + } + + item.name = pSnapFile->pCheckpointSelfcheck; + item.type = ROCKSDB_CHECKPOINT_SELFCHECK_TYPE; + if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) { taosArrayPush(pSnapFile->pFileList, &item); } return 0; } int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { + int32_t code = 0; TdDirPtr pDir = taosOpenDir(pSnapFile->path); if (NULL == pDir) { - stError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path); - return -1; + code = TAOS_SYSTEM_ERROR(errno); + stError("%s failed to open %s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, tstrerror(code)); + return code; } TdDirEntryPtr pDirEntry; @@ -216,43 +241,88 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { char* name = taosGetDirEntryName(pDirEntry); if (strlen(name) >= strlen(ROCKSDB_CURRENT) && 0 == strncmp(name, ROCKSDB_CURRENT, strlen(ROCKSDB_CURRENT))) { pSnapFile->pCurrent = taosStrdup(name); + if (pSnapFile->pCurrent == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } continue; } if (strlen(name) >= strlen(ROCKSDB_MAINFEST) && 0 == strncmp(name, ROCKSDB_MAINFEST, strlen(ROCKSDB_MAINFEST))) { pSnapFile->pMainfest = taosStrdup(name); + if (pSnapFile->pMainfest == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } continue; } if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) { pSnapFile->pOptions = taosStrdup(name); + if (pSnapFile->pOptions == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } continue; } if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) && 0 == strncmp(name, ROCKSDB_CHECKPOINT_META, strlen(ROCKSDB_CHECKPOINT_META))) { pSnapFile->pCheckpointMeta = taosStrdup(name); + if (pSnapFile->pCheckpointMeta == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + continue; + } + if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_SELF_CHECK) && + 0 == strncmp(name, ROCKSDB_CHECKPOINT_SELF_CHECK, strlen(ROCKSDB_CHECKPOINT_SELF_CHECK))) { + pSnapFile->pCheckpointSelfcheck = taosStrdup(name); + if (pSnapFile->pCheckpointSelfcheck == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } continue; } if (strlen(name) >= strlen(ROCKSDB_SST) && 0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) { char* sst = taosStrdup(name); + if (sst == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } taosArrayPush(pSnapFile->pSst, &sst); } } taosCloseDir(&pDir); - return 0; + return code; } int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) { - int32_t code = -1; + int32_t code = 0; + int32_t nBytes = 0; + int32_t cap = strlen(pSnap->dbPrefixPath) + 256; + + char* path = taosMemoryCalloc(1, cap); + if (path == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + nBytes = snprintf(path, cap, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, + "checkpoint", pSnap->chkpId); + if (nBytes <= 0 || nBytes >= cap) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } - char* path = taosMemoryCalloc(1, strlen(pSnap->dbPrefixPath) + 256); - // char idstr[64] = {0}; - sprintf(path, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", - pSnap->chkpId); if (!taosIsDir(path)) { + code = TSDB_CODE_INVALID_MSG; goto _ERROR; } pSnapFile->pSst = taosArrayInit(16, sizeof(void*)); pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); + if (pSnapFile->pSst == NULL || pSnapFile->pFileList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _ERROR; + } + pSnapFile->path = path; pSnapFile->snapInfo = *pSnap; if ((code = snapFileReadMeta(pSnapFile)) != 0) { @@ -264,7 +334,6 @@ int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBacke snapFileDebugInfo(pSnapFile); path = NULL; - code = 0; _ERROR: taosMemoryFree(path); @@ -276,6 +345,7 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { taosMemoryFree(pSnap->pMainfest); taosMemoryFree(pSnap->pOptions); taosMemoryFree(pSnap->path); + taosMemoryFree(pSnap->pCheckpointSelfcheck); for (int32_t i = 0; i < taosArrayGetSize(pSnap->pSst); i++) { char* sst = taosArrayGetP(pSnap->pSst, i); taosMemoryFree(sst); @@ -295,14 +365,25 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { } int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) { // impl later + int32_t code = 0; SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); - int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); + if (pSnapInfoSet == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); if (code != 0) { + stError("failed to do task db snap info, reason:%s", tstrerror(code)); taosArrayDestroy(pSnapInfoSet); - return -1; + return code; } SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); + if (pDbSnapSet == NULL) { + taosArrayDestroy(pSnapInfoSet); + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) { SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i); @@ -318,6 +399,10 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta pHandle->currIdx = 0; pHandle->pMeta = pMeta; return 0; + +_err: + streamSnapHandleDestroy(pHandle); + return code; } void streamSnapHandleDestroy(SStreamSnapHandle* handle) { @@ -348,9 +433,10 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa return TSDB_CODE_OUT_OF_MEMORY; } - if (streamSnapHandleInit(&pReader->handle, (char*)path, pMeta) < 0) { + int32_t code = streamSnapHandleInit(&pReader->handle, (char*)path, pMeta); + if (code != 0) { taosMemoryFree(pReader); - return -1; + return code; } *ppReader = pReader; @@ -410,10 +496,10 @@ _NEXT: int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset); if (nread == -1) { taosMemoryFree(buf); - code = TAOS_SYSTEM_ERROR(terrno); + code = TAOS_SYSTEM_ERROR(errno); stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name, item->type, tstrerror(code)); - return -1; + return code; } else if (nread > 0 && nread <= kBlockSize) { // left bytes less than kBlockSize stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, @@ -473,6 +559,7 @@ _NEXT: // SMetaSnapWriter ======================================== int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapWriter** ppWriter) { // impl later + int32_t code = 0; SStreamSnapWriter* pWriter = taosMemoryCalloc(1, sizeof(SStreamSnapWriter)); if (pWriter == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -480,11 +567,27 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path SStreamSnapHandle* pHandle = &pWriter->handle; pHandle->currIdx = 0; + pHandle->metaPath = taosStrdup(path); + if (pHandle->metaPath == NULL) { + taosMemoryFree(pWriter); + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); + if (pHandle->pDbSnapSet == NULL) { + streamSnapWriterClose(pWriter, 0); + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } SBackendSnapFile2 snapFile = {0}; - taosArrayPush(pHandle->pDbSnapSet, &snapFile); + if (taosArrayPush(pHandle->pDbSnapSet, &snapFile) == NULL) { + streamSnapWriterClose(pWriter, 0); + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } *ppWriter = pWriter; return 0; @@ -506,7 +609,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t if (pSnapFile->fd == 0) { pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pSnapFile->fd == NULL) { - code = TAOS_SYSTEM_ERROR(terrno); + code = TAOS_SYSTEM_ERROR(errno); stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pHandle->metaPath, TD_DIRSEP, pHdr->name, tstrerror(code)); } @@ -514,7 +617,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { int64_t bytes = taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset); if (bytes != pHdr->size) { - code = TAOS_SYSTEM_ERROR(terrno); + code = TAOS_SYSTEM_ERROR(errno); stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); return code; } else { @@ -535,12 +638,16 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pSnapFile->fd == NULL) { - code = TAOS_SYSTEM_ERROR(terrno); + code = TAOS_SYSTEM_ERROR(errno); stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, TD_DIRSEP, pHdr->name, tstrerror(code)); } - taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset); + if (taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset) != pHdr->size) { + code = TAOS_SYSTEM_ERROR(errno); + stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); + return code; + } stInfo("succ to write data %s", pItem->name); pSnapFile->offset += pHdr->size; } diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 2fb257fe4e..38d48a2a32 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -29,7 +29,7 @@ class BackendEnv : public ::testing::Test { void *backendCreate() { const char *streamPath = "/tmp"; - void * p = NULL; + void *p = NULL; // char *absPath = NULL; // // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2); @@ -52,7 +52,7 @@ SStreamState *stateCreate(const char *path) { } void *backendOpen() { streamMetaInit(); - const char * path = "/tmp/backend"; + const char *path = "/tmp/backend"; SStreamState *p = stateCreate(path); ASSERT(p != NULL); @@ -79,7 +79,7 @@ void *backendOpen() { const char *val = "value data"; int32_t len = 0; - char * newVal = NULL; + char *newVal = NULL; streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); ASSERT(len == strlen(val)); } @@ -100,7 +100,7 @@ void *backendOpen() { const char *val = "value data"; int32_t len = 0; - char * newVal = NULL; + char *newVal = NULL; int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); ASSERT(code != 0); } @@ -130,7 +130,7 @@ void *backendOpen() { winkey.groupId = 0; winkey.ts = tsArray[0]; - char * val = NULL; + char *val = NULL; int32_t len = 0; pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey); @@ -157,7 +157,7 @@ void *backendOpen() { key.ts = tsArray[i]; key.exprIdx = i; - char * val = NULL; + char *val = NULL; int32_t len = 0; streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len); ASSERT(len == strlen("Value")); @@ -168,7 +168,7 @@ void *backendOpen() { key.ts = tsArray[i]; key.exprIdx = i; - char * val = NULL; + char *val = NULL; int32_t len = 0; streamStateFuncDel_rocksdb(p, &key); } @@ -213,7 +213,7 @@ void *backendOpen() { { SSessionKey key; memset(&key, 0, sizeof(key)); - char * val = NULL; + char *val = NULL; int32_t vlen = 0; code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); ASSERT(code == 0); @@ -260,7 +260,7 @@ void *backendOpen() { SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]}; key.groupId = (uint64_t)(i); key.ts = tsArray[i]; - char * val = NULL; + char *val = NULL; int32_t vlen = 0; ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0); taosMemoryFreeClear(val); @@ -272,7 +272,7 @@ void *backendOpen() { SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key); ASSERT(pCurr != NULL); - char * val = NULL; + char *val = NULL; int32_t vlen = 0; ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen)); ASSERT(vlen == strlen("Value")); @@ -296,7 +296,7 @@ void *backendOpen() { SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]}; key.groupId = (uint64_t)(i); key.ts = tsArray[i]; - char * val = NULL; + char *val = NULL; int32_t vlen = 0; ASSERT(streamStateFillDel_rocksdb(p, &key) == 0); taosMemoryFreeClear(val); @@ -338,7 +338,7 @@ void *backendOpen() { char key[128] = {0}; sprintf(key, "tbname_%d", i); - char * val = NULL; + char *val = NULL; int32_t len = 0; code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len); ASSERT(code == 0); @@ -354,7 +354,7 @@ TEST_F(BackendEnv, checkOpen) { SStreamState *p = (SStreamState *)backendOpen(); int64_t tsStart = taosGetTimestampMs(); { - void * pBatch = streamStateCreateBatch(); + void *pBatch = streamStateCreateBatch(); int32_t size = 0; for (int i = 0; i < size; i++) { char key[128] = {0}; @@ -368,7 +368,7 @@ TEST_F(BackendEnv, checkOpen) { streamStateDestroyBatch(pBatch); } { - void * pBatch = streamStateCreateBatch(); + void *pBatch = streamStateCreateBatch(); int32_t size = 0; char valBuf[256] = {0}; for (int i = 0; i < size; i++) { @@ -383,9 +383,9 @@ TEST_F(BackendEnv, checkOpen) { streamStateDestroyBatch(pBatch); } // do checkpoint 2 - taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2); + taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2, 0); { - void * pBatch = streamStateCreateBatch(); + void *pBatch = streamStateCreateBatch(); int32_t size = 0; char valBuf[256] = {0}; for (int i = 0; i < size; i++) { @@ -400,17 +400,17 @@ TEST_F(BackendEnv, checkOpen) { streamStateDestroyBatch(pBatch); } - taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3); + taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3, 0); const char *path = "/tmp/backend/stream"; const char *dump = "/tmp/backend/stream/dump"; // taosMkDir(dump); taosMulMkDir(dump); SBkdMgt *mgt = bkdMgtCreate((char *)path); - SArray * result = taosArrayInit(4, sizeof(void *)); + SArray *result = taosArrayInit(4, sizeof(void *)); bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); - taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4); + taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0); taosArrayClear(result); bkdMgtGetDelta(mgt, p->pTdbState->idstr, 4, result, (char *)dump); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index cf41887142..525e87ff2f 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -98,6 +98,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout") TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space") +TAOS_DEFINE_ERROR(TSDB_CODE_THIRDPARTY_ERROR, "third party error, please check the log") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")