diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index b39cfcef92..66ea5ea5c7 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -180,7 +180,6 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id); void walCloseReader(SWalReader *pRead); void walReadReset(SWalReader *pReader); int32_t walReadVer(SWalReader *pRead, int64_t ver); -void decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func); int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver); int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader *pReader); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index bc7d93674a..0cdfbdb50a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -309,10 +309,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con int32_t code = 0; while (1) { - code = walNextValidMsg(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + TAOS_CHECK_RETURN(walNextValidMsg(pReader)); SWalCont* pCont = &pReader->pHead->head; int64_t ver = pCont->version; diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 49295a8e5b..974d577c5b 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -166,6 +166,8 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized); int32_t walMetaDeserialize(SWal* pWal, const char* bytes); // meta section end +void decryptBody(SWalCfg* cfg, SWalCkHead* pHead, int32_t plainBodyLen, const char* func); + int64_t walGetSeq(); #ifdef __cplusplus diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 310258a55e..50cfada96b 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -75,30 +75,24 @@ int32_t walNextValidMsg(SWalReader *pReader) { wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64, pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); if (fetchVer > appliedVer) { - terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - return -1; + TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST); } while (fetchVer <= appliedVer) { - if (walFetchHead(pReader, fetchVer) < 0) { - return -1; - } + TAOS_CHECK_RETURN(walFetchHead(pReader, fetchVer)); int32_t type = pReader->pHead->head.msgType; if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || (IS_META_MSG(type) && pReader->cond.scanMeta)) { - int32_t code = walFetchBody(pReader); - return (code == TSDB_CODE_SUCCESS) ? 0 : -1; + TAOS_RETURN(walFetchBody(pReader)); } else { - if (walSkipFetchBody(pReader) < 0) { - return -1; - } + TAOS_CHECK_RETURN(walSkipFetchBody(pReader)); fetchVer = pReader->curVersion; } } - return -1; + TAOS_RETURN(TSDB_CODE_FAILED); } int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; } @@ -130,7 +124,7 @@ void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset) { } } -static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { +static int32_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { int64_t ret = 0; TdFilePtr pIdxTFile = pReader->pIdxFile; @@ -140,32 +134,34 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry); ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET); if (ret < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, offset, terrstr()); - return -1; + + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } SWalIdxEntry entry = {0}; if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) { if (ret < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr()); + + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } else { - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %ld", pReader->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry)); + + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } - return -1; } ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); if (ret < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, entry.offset, terrstr()); - return -1; + + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } - return ret; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) { @@ -177,9 +173,9 @@ static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) { walBuildLogName(pReader->pWal, fileFirstVer, fnameStr); TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ); if (pLogFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr()); - return -1; + + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } pReader->pLogFile = pLogFile; @@ -187,16 +183,16 @@ static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) { walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr); TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ); if (pIdxFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr()); - return -1; + + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } pReader->pIdxFile = pIdxFile; pReader->curFileFirstVer = fileFirstVer; - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { @@ -208,47 +204,43 @@ static int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); if (pRet == NULL) { wError("failed to find WAL log file with ver:%" PRId64, ver); - terrno = TSDB_CODE_WAL_INVALID_VER; - return -1; + + TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); } if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner - if (walReadChangeFile(pReader, pRet->firstVer) < 0) { - return -1; - } + TAOS_CHECK_RETURN(walReadChangeFile(pReader, pRet->firstVer)); } // error code was set inner - if (walReadSeekFilePos(pReader, pRet->firstVer, ver) < 0) { - return -1; - } + TAOS_CHECK_RETURN(walReadSeekFilePos(pReader, pRet->firstVer, ver)); wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, ver); pReader->curVersion = ver; - return 0; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) { SWal *pWal = pReader->pWal; if (ver == pReader->curVersion) { wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver); - return 0; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer); - terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - return -1; + + TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST); } - if (walReadSeekVerImpl(pReader, ver) < 0) { - return -1; - } + TAOS_CHECK_RETURN(walReadSeekVerImpl(pReader, ver)); - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t walFetchHead(SWalReader *pRead, int64_t ver) { @@ -258,14 +250,12 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { // TODO: valid ver if (ver > pRead->pWal->vers.commitVer) { - return -1; + TAOS_RETURN(TSDB_CODE_FAILED); } if (pRead->curVersion != ver) { - code = walReaderSeekVer(pRead, ver); - if (code < 0) { - return -1; - } + TAOS_CHECK_RETURN(walReaderSeekVer(pRead, ver)); + seeked = true; } @@ -274,18 +264,16 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { if (contLen == sizeof(SWalCkHead)) { break; } else if (contLen == 0 && !seeked) { - if (walReadSeekVerImpl(pRead, ver) < 0) { - return -1; - } + TAOS_CHECK_RETURN(walReadSeekVerImpl(pRead, ver)); + seeked = true; continue; } else { if (contLen < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } else { - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } - return -1; } } @@ -294,11 +282,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { if (code != 0) { wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%" PRIx64, pRead->pWal->cfg.vgId, ver, pRead->readerId); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - return -1; + + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t walSkipFetchBody(SWalReader *pRead) { @@ -315,12 +303,12 @@ int32_t walSkipFetchBody(SWalReader *pRead) { } int64_t code = taosLSeekFile(pRead->pLogFile, cryptedBodyLen, SEEK_CUR); if (code < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } pRead->curVersion++; - return 0; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t walFetchBody(SWalReader *pRead) { @@ -345,8 +333,7 @@ int32_t walFetchBody(SWalReader *pRead) { if (pRead->capacity < cryptedBodyLen) { SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + cryptedBodyLen); if (ptr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pRead->pHead = ptr; pReadHead = &pRead->pHead->head; @@ -355,23 +342,24 @@ int32_t walFetchBody(SWalReader *pRead) { if (cryptedBodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, cryptedBodyLen)) { if (plainBodyLen < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, 0x%" PRIx64, vgId, pReadHead->version, ver, tstrerror(terrno), id); + + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); } else { wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted, 0x%" PRIx64, vgId, pReadHead->version, ver, id); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } - return -1; } if (pReadHead->version != ver) { wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64 ", 0x%" PRIx64, vgId, pReadHead->version, ver, id); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - return -1; + + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } decryptBody(&pRead->pWal->cfg, pRead->pHead, plainBodyLen, __FUNCTION__); @@ -379,40 +367,43 @@ int32_t walFetchBody(SWalReader *pRead) { if (walValidBodyCksum(pRead->pHead) != 0) { wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed, 0x%" PRIx64, vgId, ver, id); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - return -1; + + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } pRead->curVersion++; - return 0; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t walReadVer(SWalReader *pReader, int64_t ver) { wDebug("vgId:%d, wal start to read index:%" PRId64, pReader->pWal->cfg.vgId, ver); int64_t contLen; - int32_t code; + int32_t code = 0; bool seeked = false; if (walIsEmpty(pReader->pWal)) { - terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - return -1; + TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST); } if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) { wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId, ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer); - terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - return -1; + + TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST); } taosThreadMutexLock(&pReader->mutex); if (pReader->curVersion != ver) { - if (walReaderSeekVer(pReader, ver) < 0) { + code = walReaderSeekVer(pReader, ver); + if (code) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); taosThreadMutexUnlock(&pReader->mutex); - return -1; + + TAOS_RETURN(code); } + seeked = true; } @@ -421,22 +412,24 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { if (contLen == sizeof(SWalCkHead)) { break; } else if (contLen == 0 && !seeked) { - if (walReadSeekVerImpl(pReader, ver) < 0) { + code = walReadSeekVerImpl(pReader, ver); + if (code) { taosThreadMutexUnlock(&pReader->mutex); - return -1; + + TAOS_RETURN(code); } seeked = true; continue; } else { - if (contLen < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - } else { - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - } wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s", pReader->pWal->cfg.vgId, ver, terrstr()); taosThreadMutexUnlock(&pReader->mutex); - return -1; + + if (contLen < 0) { + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); + } else { + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); + } } } @@ -444,9 +437,9 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { if (code != 0) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId, ver); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; taosThreadMutexUnlock(&pReader->mutex); - return -1; + + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } int32_t plainBodyLen = pReader->pHead->head.bodyLen; @@ -460,33 +453,33 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { if (pReader->capacity < cryptedBodyLen) { SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + cryptedBodyLen); if (ptr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosThreadMutexUnlock(&pReader->mutex); - return -1; + + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pReader->pHead = ptr; pReader->capacity = cryptedBodyLen; } if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, cryptedBodyLen)) != cryptedBodyLen) { - if (contLen < 0) - terrno = TAOS_SYSTEM_ERROR(errno); - else { - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - } wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s", pReader->pWal->cfg.vgId, ver, terrstr()); taosThreadMutexUnlock(&pReader->mutex); - return -1; + + if (contLen < 0) { + TAOS_RETURN(TAOS_SYSTEM_ERROR(errno)); + } else { + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); + } } if (pReader->pHead->head.version != ver) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver); // pReader->curInvalid = 1; - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; taosThreadMutexUnlock(&pReader->mutex); - return -1; + + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } decryptBody(&pReader->pWal->cfg, pReader->pHead, plainBodyLen, __FUNCTION__); @@ -499,15 +492,16 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { uint32_t logCkSum = pReader->pHead->cksumBody; wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum); // pReader->curInvalid = 1; - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + taosThreadMutexUnlock(&pReader->mutex); - return -1; + + TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); } pReader->curVersion++; taosThreadMutexUnlock(&pReader->mutex); - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } void decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func) {