From 32222375e69acd94a9415a512d1d07aa04217c63 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 3 Sep 2024 10:32:06 +0800 Subject: [PATCH] enh: improve error handle in message encode and decode --- source/common/src/tmsg.c | 171 +++++++++++--------- source/libs/sync/src/syncRequestVoteReply.c | 6 +- 2 files changed, 98 insertions(+), 79 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f02463656d..579ffaa889 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -153,7 +153,9 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { } int32_t tInitSubmitBlkIter(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { - if (pMsgIter->dataLen <= 0) return -1; + if (pMsgIter->dataLen <= 0) { + return TSDB_CODE_INVALID_PARA; + } pIter->totalLen = pMsgIter->dataLen; pIter->len = 0; pIter->row = (STSRow *)(pBlock->data + pMsgIter->schemaLen); @@ -174,27 +176,6 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { } } -#ifdef BUILD_NO_CALL -int32_t tPrintFixedSchemaSubmitReq(SSubmitReq *pReq, STSchema *pTschema) { - SSubmitMsgIter msgIter = {0}; - if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1; - while (true) { - SSubmitBlk *pBlock = NULL; - if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; - if (pBlock == NULL) break; - SSubmitBlkIter blkIter = {0}; - tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); - STSRowIter rowIter = {0}; - tdSTSRowIterInit(&rowIter, pTschema); - STSRow *row; - while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) { - tdSRowPrint(row, pTschema, "stream"); - } - } - return 0; -} -#endif - int32_t tEncodeSEpSet(SEncoder *pEncoder, const SEpSet *pEp) { TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pEp->inUse)); TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pEp->numOfEps)); @@ -346,14 +327,18 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &queryNum)); if (queryNum) { pReq->query = taosMemoryCalloc(1, sizeof(*pReq->query)); - if (NULL == pReq->query) return -1; + if (NULL == pReq->query) { + return terrno; + } TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pReq->query->connId)); int32_t num = 0; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &num)); if (num > 0) { pReq->query->queryDesc = taosArrayInit(num, sizeof(SQueryDesc)); - if (NULL == pReq->query->queryDesc) return -1; + if (NULL == pReq->query->queryDesc) { + return terrno; + } for (int32_t i = 0; i < num; ++i) { SQueryDesc desc = {0}; @@ -371,7 +356,9 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &snum)); if (snum > 0) { desc.subDesc = taosArrayInit(snum, sizeof(SQuerySubDesc)); - if (NULL == desc.subDesc) return -1; + if (NULL == desc.subDesc) { + return terrno; + } for (int32_t m = 0; m < snum; ++m) { SQuerySubDesc sDesc = {0}; @@ -457,7 +444,9 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp) TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &queryNum)); if (queryNum) { pRsp->query = taosMemoryCalloc(1, sizeof(*pRsp->query)); - if (NULL == pRsp->query) return -1; + if (NULL == pRsp->query) { + return terrno; + } TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pRsp->query->connId)); TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pRsp->query->killRid)); TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pRsp->query->totalDnodes)); @@ -480,7 +469,9 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp) int32_t kvNum = 0; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &kvNum)); pRsp->info = taosArrayInit(kvNum, sizeof(SKv)); - if (pRsp->info == NULL) return -1; + if (pRsp->info == NULL) { + return terrno; + } for (int32_t i = 0; i < kvNum; i++) { SKv kv = {0}; TAOS_CHECK_RETURN(tDecodeSKv(pDecoder, &kv)); @@ -492,108 +483,136 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp) int32_t tSerializeSClientHbBatchReq(void *buf, int32_t bufLen, const SClientHbBatchReq *pBatchReq) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen = 0; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI64(&encoder, pBatchReq->reqId) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pBatchReq->reqId)); int32_t reqNum = taosArrayGetSize(pBatchReq->reqs); - if (tEncodeI32(&encoder, reqNum) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(&encoder, reqNum)); for (int32_t i = 0; i < reqNum; i++) { SClientHbReq *pReq = taosArrayGet(pBatchReq->reqs, i); - if (tSerializeSClientHbReq(&encoder, pReq) < 0) return -1; + TAOS_CHECK_EXIT(tSerializeSClientHbReq(&encoder, pReq)); } - if (tEncodeI64(&encoder, pBatchReq->ipWhiteList) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pBatchReq->ipWhiteList)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeSClientHbBatchReq(void *buf, int32_t bufLen, SClientHbBatchReq *pBatchReq) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeI64(&decoder, &pBatchReq->reqId) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBatchReq->reqId)); int32_t reqNum = 0; - if (tDecodeI32(&decoder, &reqNum) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &reqNum)); if (reqNum > 0) { pBatchReq->reqs = taosArrayInit(reqNum, sizeof(SClientHbReq)); - if (NULL == pBatchReq->reqs) return -1; + if (NULL == pBatchReq->reqs) { + return terrno; + } } for (int32_t i = 0; i < reqNum; i++) { SClientHbReq req = {0}; - if (tDeserializeSClientHbReq(&decoder, &req) < 0) return -1; - if (!taosArrayPush(pBatchReq->reqs, &req)) return -1; + TAOS_CHECK_EXIT(tDeserializeSClientHbReq(&decoder, &req)); + if (!taosArrayPush(pBatchReq->reqs, &req)) { + TAOS_CHECK_EXIT(terrno); + } } if (!tDecodeIsEnd(&decoder)) { - if (tDecodeI64(&decoder, &pBatchReq->ipWhiteList) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBatchReq->ipWhiteList)); } tEndDecode(&decoder); + +_exit: tDecoderClear(&decoder); - return 0; + return code; } int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBatchRsp *pBatchRsp) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI64(&encoder, pBatchRsp->reqId) < 0) return -1; - if (tEncodeI64(&encoder, pBatchRsp->rspId) < 0) return -1; - if (tEncodeI32(&encoder, pBatchRsp->svrTimestamp) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pBatchRsp->reqId)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pBatchRsp->rspId)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pBatchRsp->svrTimestamp)); int32_t rspNum = taosArrayGetSize(pBatchRsp->rsps); - if (tEncodeI32(&encoder, rspNum) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(&encoder, rspNum)); for (int32_t i = 0; i < rspNum; i++) { SClientHbRsp *pRsp = taosArrayGet(pBatchRsp->rsps, i); - if (tSerializeSClientHbRsp(&encoder, pRsp) < 0) return -1; + TAOS_CHECK_EXIT(tSerializeSClientHbRsp(&encoder, pRsp)); } - if (tSerializeSMonitorParas(&encoder, &pBatchRsp->monitorParas) < 0) return -1; + TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pBatchRsp->monitorParas)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchRsp *pBatchRsp) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; tDecoderInit(&decoder, buf, bufLen); - int32_t ret = -1; - if (tStartDecode(&decoder) < 0) goto _END; - if (tDecodeI64(&decoder, &pBatchRsp->reqId) < 0) goto _END; - if (tDecodeI64(&decoder, &pBatchRsp->rspId) < 0) goto _END; - if (tDecodeI32(&decoder, &pBatchRsp->svrTimestamp) < 0) goto _END; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBatchRsp->reqId)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBatchRsp->rspId)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pBatchRsp->svrTimestamp)); int32_t rspNum = 0; - if (tDecodeI32(&decoder, &rspNum) < 0) goto _END; + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &rspNum)); if (pBatchRsp->rsps == NULL) { - if ((pBatchRsp->rsps = taosArrayInit(rspNum, sizeof(SClientHbRsp))) == NULL) goto _END; + if ((pBatchRsp->rsps = taosArrayInit(rspNum, sizeof(SClientHbRsp))) == NULL) { + TAOS_CHECK_EXIT(terrno); + } } for (int32_t i = 0; i < rspNum; i++) { SClientHbRsp rsp = {0}; - if (tDeserializeSClientHbRsp(&decoder, &rsp) < 0) goto _END; - if (taosArrayPush(pBatchRsp->rsps, &rsp) == NULL) goto _END; + TAOS_CHECK_EXIT(tDeserializeSClientHbRsp(&decoder, &rsp)); + if (taosArrayPush(pBatchRsp->rsps, &rsp) == NULL) { + TAOS_CHECK_EXIT(terrno); + } } if (!tDecodeIsEnd(&decoder)) { - if (tDeserializeSMonitorParas(&decoder, &pBatchRsp->monitorParas) < 0) goto _END; + TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pBatchRsp->monitorParas)); } tEndDecode(&decoder); - ret = 0; -_END: +_exit: tDecoderClear(&decoder); - return ret; + return code; } int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq) { @@ -5264,27 +5283,27 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { tDecoderInit(&decoder, buf, bufLen); int32_t ret = -1; - if (tStartDecode(&decoder) < 0) goto _END; - if (tDecodeI32(&decoder, &pRsp->acctId) < 0) goto _END; - if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) goto _END; - if (tDecodeU32(&decoder, &pRsp->connId) < 0) goto _END; - if (tDecodeI32(&decoder, &pRsp->dnodeNum) < 0) goto _END; - if (tDecodeI8(&decoder, &pRsp->superUser) < 0) goto _END; - if (tDecodeI8(&decoder, &pRsp->sysInfo) < 0) goto _END; - if (tDecodeI8(&decoder, &pRsp->connType) < 0) goto _END; - if (tDecodeSEpSet(&decoder,&pRsp->epSet) < 0) goto _END; - if (tDecodeI32(&decoder, &pRsp->svrTimestamp) < 0) goto _END; - if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) goto _END; - if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) goto _END; + if (tStartDecode(&decoder) < 0) goto _END; + if (tDecodeI32(&decoder, &pRsp->acctId) < 0) goto _END; + if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) goto _END; + if (tDecodeU32(&decoder, &pRsp->connId) < 0) goto _END; + if (tDecodeI32(&decoder, &pRsp->dnodeNum) < 0) goto _END; + if (tDecodeI8(&decoder, &pRsp->superUser) < 0) goto _END; + if (tDecodeI8(&decoder, &pRsp->sysInfo) < 0) goto _END; + if (tDecodeI8(&decoder, &pRsp->connType) < 0) goto _END; + if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) goto _END; + if (tDecodeI32(&decoder, &pRsp->svrTimestamp) < 0) goto _END; + if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) goto _END; + if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) goto _END; if (!tDecodeIsEnd(&decoder)) { - if (tDecodeI32(&decoder, &pRsp->passVer) < 0) goto _END; + if (tDecodeI32(&decoder, &pRsp->passVer) < 0) goto _END; } else { pRsp->passVer = 0; } // since 3.0.7.0 if (!tDecodeIsEnd(&decoder)) { - if (tDecodeI32(&decoder, &pRsp->authVer) < 0) goto _END; + if (tDecodeI32(&decoder, &pRsp->authVer) < 0) goto _END; } else { pRsp->authVer = 0; } diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index add237c2d0..9f2d746755 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -53,14 +53,14 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (pMsg->term < currentTerm) { syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response"); - TAOS_RETURN(TSDB_CODE_FAILED); + TAOS_RETURN(TSDB_CODE_SYN_WRONG_TERM); } if (pMsg->term > currentTerm) { syncLogRecvRequestVoteReply(ths, pMsg, "error term"); syncNodeStepDown(ths, pMsg->term); - TAOS_RETURN(TSDB_CODE_FAILED); + TAOS_RETURN(TSDB_CODE_SYN_WRONG_TERM); } syncLogRecvRequestVoteReply(ths, pMsg, ""); @@ -73,7 +73,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { sNError(ths, "vote respond error vote-respond-mgr term:%" PRIu64 ", msg term:%" PRIu64 "", ths->pVotesRespond->term, pMsg->term); - TAOS_RETURN(TSDB_CODE_FAILED); + TAOS_RETURN(TSDB_CODE_SYN_WRONG_TERM); } votesRespondAdd(ths->pVotesRespond, pMsg);