Merge pull request #27629 from taosdata/enh/TD-31803-1

enh: improve error handle in message encode and decode
This commit is contained in:
Hongze Cheng 2024-09-03 11:42:38 +08:00 committed by GitHub
commit b2f2d56c41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 98 additions and 79 deletions

View File

@ -153,7 +153,9 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
} }
int32_t tInitSubmitBlkIter(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { int32_t tInitSubmitBlkIter(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pMsgIter->dataLen <= 0) return -1; if (pMsgIter->dataLen <= 0) {
return TSDB_CODE_INVALID_PARA;
}
pIter->totalLen = pMsgIter->dataLen; pIter->totalLen = pMsgIter->dataLen;
pIter->len = 0; pIter->len = 0;
pIter->row = (STSRow *)(pBlock->data + pMsgIter->schemaLen); pIter->row = (STSRow *)(pBlock->data + pMsgIter->schemaLen);
@ -174,27 +176,6 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
} }
} }
#ifdef BUILD_NO_CALL
int32_t tPrintFixedSchemaSubmitReq(SSubmitReq *pReq, STSchema *pTschema) {
SSubmitMsgIter msgIter = {0};
if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1;
while (true) {
SSubmitBlk *pBlock = NULL;
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
SSubmitBlkIter blkIter = {0};
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
STSRowIter rowIter = {0};
tdSTSRowIterInit(&rowIter, pTschema);
STSRow *row;
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
tdSRowPrint(row, pTschema, "stream");
}
}
return 0;
}
#endif
int32_t tEncodeSEpSet(SEncoder *pEncoder, const SEpSet *pEp) { int32_t tEncodeSEpSet(SEncoder *pEncoder, const SEpSet *pEp) {
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pEp->inUse)); TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pEp->inUse));
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pEp->numOfEps)); TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pEp->numOfEps));
@ -346,14 +327,18 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &queryNum)); TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &queryNum));
if (queryNum) { if (queryNum) {
pReq->query = taosMemoryCalloc(1, sizeof(*pReq->query)); pReq->query = taosMemoryCalloc(1, sizeof(*pReq->query));
if (NULL == pReq->query) return -1; if (NULL == pReq->query) {
return terrno;
}
TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pReq->query->connId)); TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pReq->query->connId));
int32_t num = 0; int32_t num = 0;
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &num)); TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &num));
if (num > 0) { if (num > 0) {
pReq->query->queryDesc = taosArrayInit(num, sizeof(SQueryDesc)); pReq->query->queryDesc = taosArrayInit(num, sizeof(SQueryDesc));
if (NULL == pReq->query->queryDesc) return -1; if (NULL == pReq->query->queryDesc) {
return terrno;
}
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SQueryDesc desc = {0}; SQueryDesc desc = {0};
@ -371,7 +356,9 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &snum)); TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &snum));
if (snum > 0) { if (snum > 0) {
desc.subDesc = taosArrayInit(snum, sizeof(SQuerySubDesc)); desc.subDesc = taosArrayInit(snum, sizeof(SQuerySubDesc));
if (NULL == desc.subDesc) return -1; if (NULL == desc.subDesc) {
return terrno;
}
for (int32_t m = 0; m < snum; ++m) { for (int32_t m = 0; m < snum; ++m) {
SQuerySubDesc sDesc = {0}; SQuerySubDesc sDesc = {0};
@ -457,7 +444,9 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp)
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &queryNum)); TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &queryNum));
if (queryNum) { if (queryNum) {
pRsp->query = taosMemoryCalloc(1, sizeof(*pRsp->query)); pRsp->query = taosMemoryCalloc(1, sizeof(*pRsp->query));
if (NULL == pRsp->query) return -1; if (NULL == pRsp->query) {
return terrno;
}
TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pRsp->query->connId)); TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pRsp->query->connId));
TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pRsp->query->killRid)); TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pRsp->query->killRid));
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pRsp->query->totalDnodes)); TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pRsp->query->totalDnodes));
@ -480,7 +469,9 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp)
int32_t kvNum = 0; int32_t kvNum = 0;
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &kvNum)); TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &kvNum));
pRsp->info = taosArrayInit(kvNum, sizeof(SKv)); pRsp->info = taosArrayInit(kvNum, sizeof(SKv));
if (pRsp->info == NULL) return -1; if (pRsp->info == NULL) {
return terrno;
}
for (int32_t i = 0; i < kvNum; i++) { for (int32_t i = 0; i < kvNum; i++) {
SKv kv = {0}; SKv kv = {0};
TAOS_CHECK_RETURN(tDecodeSKv(pDecoder, &kv)); TAOS_CHECK_RETURN(tDecodeSKv(pDecoder, &kv));
@ -492,108 +483,136 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp)
int32_t tSerializeSClientHbBatchReq(void *buf, int32_t bufLen, const SClientHbBatchReq *pBatchReq) { int32_t tSerializeSClientHbBatchReq(void *buf, int32_t bufLen, const SClientHbBatchReq *pBatchReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
int32_t code = 0;
int32_t lino;
int32_t tlen = 0;
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; TAOS_CHECK_EXIT(tStartEncode(&encoder));
if (tEncodeI64(&encoder, pBatchReq->reqId) < 0) return -1; TAOS_CHECK_EXIT(tEncodeI64(&encoder, pBatchReq->reqId));
int32_t reqNum = taosArrayGetSize(pBatchReq->reqs); int32_t reqNum = taosArrayGetSize(pBatchReq->reqs);
if (tEncodeI32(&encoder, reqNum) < 0) return -1; TAOS_CHECK_EXIT(tEncodeI32(&encoder, reqNum));
for (int32_t i = 0; i < reqNum; i++) { for (int32_t i = 0; i < reqNum; i++) {
SClientHbReq *pReq = taosArrayGet(pBatchReq->reqs, i); SClientHbReq *pReq = taosArrayGet(pBatchReq->reqs, i);
if (tSerializeSClientHbReq(&encoder, pReq) < 0) return -1; TAOS_CHECK_EXIT(tSerializeSClientHbReq(&encoder, pReq));
} }
if (tEncodeI64(&encoder, pBatchReq->ipWhiteList) < 0) return -1; TAOS_CHECK_EXIT(tEncodeI64(&encoder, pBatchReq->ipWhiteList));
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; _exit:
if (code) {
tlen = code;
} else {
tlen = encoder.pos;
}
tEncoderClear(&encoder); tEncoderClear(&encoder);
return tlen; return tlen;
} }
int32_t tDeserializeSClientHbBatchReq(void *buf, int32_t bufLen, SClientHbBatchReq *pBatchReq) { int32_t tDeserializeSClientHbBatchReq(void *buf, int32_t bufLen, SClientHbBatchReq *pBatchReq) {
SDecoder decoder = {0}; SDecoder decoder = {0};
int32_t code = 0;
int32_t lino;
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; TAOS_CHECK_EXIT(tStartDecode(&decoder));
if (tDecodeI64(&decoder, &pBatchReq->reqId) < 0) return -1; TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBatchReq->reqId));
int32_t reqNum = 0; int32_t reqNum = 0;
if (tDecodeI32(&decoder, &reqNum) < 0) return -1; TAOS_CHECK_EXIT(tDecodeI32(&decoder, &reqNum));
if (reqNum > 0) { if (reqNum > 0) {
pBatchReq->reqs = taosArrayInit(reqNum, sizeof(SClientHbReq)); pBatchReq->reqs = taosArrayInit(reqNum, sizeof(SClientHbReq));
if (NULL == pBatchReq->reqs) return -1; if (NULL == pBatchReq->reqs) {
return terrno;
}
} }
for (int32_t i = 0; i < reqNum; i++) { for (int32_t i = 0; i < reqNum; i++) {
SClientHbReq req = {0}; SClientHbReq req = {0};
if (tDeserializeSClientHbReq(&decoder, &req) < 0) return -1; TAOS_CHECK_EXIT(tDeserializeSClientHbReq(&decoder, &req));
if (!taosArrayPush(pBatchReq->reqs, &req)) return -1; if (!taosArrayPush(pBatchReq->reqs, &req)) {
TAOS_CHECK_EXIT(terrno);
}
} }
if (!tDecodeIsEnd(&decoder)) { if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pBatchReq->ipWhiteList) < 0) return -1; TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBatchReq->ipWhiteList));
} }
tEndDecode(&decoder); tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return code;
} }
int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBatchRsp *pBatchRsp) { int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBatchRsp *pBatchRsp) {
SEncoder encoder = {0}; SEncoder encoder = {0};
int32_t code = 0;
int32_t lino;
int32_t tlen;
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; TAOS_CHECK_EXIT(tStartEncode(&encoder));
if (tEncodeI64(&encoder, pBatchRsp->reqId) < 0) return -1; TAOS_CHECK_EXIT(tEncodeI64(&encoder, pBatchRsp->reqId));
if (tEncodeI64(&encoder, pBatchRsp->rspId) < 0) return -1; TAOS_CHECK_EXIT(tEncodeI64(&encoder, pBatchRsp->rspId));
if (tEncodeI32(&encoder, pBatchRsp->svrTimestamp) < 0) return -1; TAOS_CHECK_EXIT(tEncodeI32(&encoder, pBatchRsp->svrTimestamp));
int32_t rspNum = taosArrayGetSize(pBatchRsp->rsps); int32_t rspNum = taosArrayGetSize(pBatchRsp->rsps);
if (tEncodeI32(&encoder, rspNum) < 0) return -1; TAOS_CHECK_EXIT(tEncodeI32(&encoder, rspNum));
for (int32_t i = 0; i < rspNum; i++) { for (int32_t i = 0; i < rspNum; i++) {
SClientHbRsp *pRsp = taosArrayGet(pBatchRsp->rsps, i); SClientHbRsp *pRsp = taosArrayGet(pBatchRsp->rsps, i);
if (tSerializeSClientHbRsp(&encoder, pRsp) < 0) return -1; TAOS_CHECK_EXIT(tSerializeSClientHbRsp(&encoder, pRsp));
} }
if (tSerializeSMonitorParas(&encoder, &pBatchRsp->monitorParas) < 0) return -1; TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pBatchRsp->monitorParas));
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; _exit:
if (code) {
tlen = code;
} else {
tlen = encoder.pos;
}
tEncoderClear(&encoder); tEncoderClear(&encoder);
return tlen; return tlen;
} }
int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchRsp *pBatchRsp) { int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchRsp *pBatchRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
int32_t code = 0;
int32_t lino;
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
int32_t ret = -1;
if (tStartDecode(&decoder) < 0) goto _END; TAOS_CHECK_EXIT(tStartDecode(&decoder));
if (tDecodeI64(&decoder, &pBatchRsp->reqId) < 0) goto _END; TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBatchRsp->reqId));
if (tDecodeI64(&decoder, &pBatchRsp->rspId) < 0) goto _END; TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pBatchRsp->rspId));
if (tDecodeI32(&decoder, &pBatchRsp->svrTimestamp) < 0) goto _END; TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pBatchRsp->svrTimestamp));
int32_t rspNum = 0; int32_t rspNum = 0;
if (tDecodeI32(&decoder, &rspNum) < 0) goto _END; TAOS_CHECK_EXIT(tDecodeI32(&decoder, &rspNum));
if (pBatchRsp->rsps == NULL) { if (pBatchRsp->rsps == NULL) {
if ((pBatchRsp->rsps = taosArrayInit(rspNum, sizeof(SClientHbRsp))) == NULL) goto _END; if ((pBatchRsp->rsps = taosArrayInit(rspNum, sizeof(SClientHbRsp))) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
} }
for (int32_t i = 0; i < rspNum; i++) { for (int32_t i = 0; i < rspNum; i++) {
SClientHbRsp rsp = {0}; SClientHbRsp rsp = {0};
if (tDeserializeSClientHbRsp(&decoder, &rsp) < 0) goto _END; TAOS_CHECK_EXIT(tDeserializeSClientHbRsp(&decoder, &rsp));
if (taosArrayPush(pBatchRsp->rsps, &rsp) == NULL) goto _END; if (taosArrayPush(pBatchRsp->rsps, &rsp) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
} }
if (!tDecodeIsEnd(&decoder)) { if (!tDecodeIsEnd(&decoder)) {
if (tDeserializeSMonitorParas(&decoder, &pBatchRsp->monitorParas) < 0) goto _END; TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pBatchRsp->monitorParas));
} }
tEndDecode(&decoder); tEndDecode(&decoder);
ret = 0;
_END: _exit:
tDecoderClear(&decoder); tDecoderClear(&decoder);
return ret; return code;
} }
int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq) { int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq) {
@ -5272,7 +5291,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tDecodeI8(&decoder, &pRsp->superUser) < 0) goto _END; if (tDecodeI8(&decoder, &pRsp->superUser) < 0) goto _END;
if (tDecodeI8(&decoder, &pRsp->sysInfo) < 0) goto _END; if (tDecodeI8(&decoder, &pRsp->sysInfo) < 0) goto _END;
if (tDecodeI8(&decoder, &pRsp->connType) < 0) goto _END; if (tDecodeI8(&decoder, &pRsp->connType) < 0) goto _END;
if (tDecodeSEpSet(&decoder,&pRsp->epSet) < 0) goto _END; if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) goto _END;
if (tDecodeI32(&decoder, &pRsp->svrTimestamp) < 0) goto _END; if (tDecodeI32(&decoder, &pRsp->svrTimestamp) < 0) goto _END;
if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) goto _END; if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) goto _END;
if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) goto _END; if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) goto _END;

View File

@ -53,14 +53,14 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (pMsg->term < currentTerm) { if (pMsg->term < currentTerm) {
syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response"); syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
TAOS_RETURN(TSDB_CODE_FAILED); TAOS_RETURN(TSDB_CODE_SYN_WRONG_TERM);
} }
if (pMsg->term > currentTerm) { if (pMsg->term > currentTerm) {
syncLogRecvRequestVoteReply(ths, pMsg, "error term"); syncLogRecvRequestVoteReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term); syncNodeStepDown(ths, pMsg->term);
TAOS_RETURN(TSDB_CODE_FAILED); TAOS_RETURN(TSDB_CODE_SYN_WRONG_TERM);
} }
syncLogRecvRequestVoteReply(ths, pMsg, ""); syncLogRecvRequestVoteReply(ths, pMsg, "");
@ -73,7 +73,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
sNError(ths, "vote respond error vote-respond-mgr term:%" PRIu64 ", msg term:%" PRIu64 "", sNError(ths, "vote respond error vote-respond-mgr term:%" PRIu64 ", msg term:%" PRIu64 "",
ths->pVotesRespond->term, pMsg->term); ths->pVotesRespond->term, pMsg->term);
TAOS_RETURN(TSDB_CODE_FAILED); TAOS_RETURN(TSDB_CODE_SYN_WRONG_TERM);
} }
votesRespondAdd(ths->pVotesRespond, pMsg); votesRespondAdd(ths->pVotesRespond, pMsg);