From a260a5f2f71110962014bc7b0b66399996b7f09a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 9 Sep 2024 11:49:44 +0800 Subject: [PATCH] enh: error handle in message encode and decode --- source/dnode/mnode/impl/src/mndCompact.c | 38 +- .../dnode/mnode/impl/src/mndCompactDetail.c | 110 +- source/dnode/vnode/src/tq/tqMeta.c | 71 +- source/libs/function/src/builtinsimpl.c | 75 +- source/libs/stream/src/streamMsg.c | 964 ++++++++++-------- 5 files changed, 716 insertions(+), 542 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index b8073885fd..bce8c5901d 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -55,36 +55,44 @@ void tFreeCompactObj(SCompactObj *pCompact) {} int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - - if (tEncodeI32(&encoder, pObj->compactId) < 0) return -1; - if (tEncodeCStr(&encoder, pObj->dbname) < 0) return -1; - if (tEncodeI64(&encoder, pObj->startTime) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->compactId)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) { - int8_t ex = 0; + int32_t code = 0; + int32_t lino; SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); - TAOS_CHECK_RETURN(tStartDecode(&decoder)); - - TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->compactId)); - TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, pObj->dbname)); - TAOS_CHECK_RETURN(tDecodeI64(&decoder, &pObj->startTime)); + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->compactId)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime)); tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } SSdbRaw *mndCompactActionEncode(SCompactObj *pCompact) { @@ -633,8 +641,8 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) { static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { int32_t code = 0; - bool needSave = false; - void *pIter = NULL; + bool needSave = false; + void *pIter = NULL; while (1) { SCompactDetailObj *pDetail = NULL; pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); diff --git a/source/dnode/mnode/impl/src/mndCompactDetail.c b/source/dnode/mnode/impl/src/mndCompactDetail.c index 86d398e7bd..d1bdb4734d 100644 --- a/source/dnode/mnode/impl/src/mndCompactDetail.c +++ b/source/dnode/mnode/impl/src/mndCompactDetail.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ #include "mndCompactDetail.h" -#include "mndTrans.h" -#include "mndShow.h" #include "mndDb.h" +#include "mndShow.h" +#include "mndTrans.h" #define MND_COMPACT_VER_NUMBER 1 @@ -35,21 +35,20 @@ int32_t mndInitCompactDetail(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } -void mndCleanupCompactDetail(SMnode *pMnode) { - mDebug("mnd compact detail cleanup"); -} +void mndCleanupCompactDetail(SMnode *pMnode) { mDebug("mnd compact detail cleanup"); } + +int32_t mndRetrieveCompactDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SCompactDetailObj *pCompactDetail = NULL; + char *sep = NULL; + SDbObj *pDb = NULL; -int32_t mndRetrieveCompactDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows){ - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - int32_t numOfRows = 0; - SCompactDetailObj *pCompactDetail = NULL; - char *sep = NULL; - SDbObj *pDb = NULL; - if (strlen(pShow->db) > 0) { sep = strchr(pShow->db, '.'); - if (sep && ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) { + if (sep && + ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) { sep++; } else { pDb = mndAcquireDb(pMnode, pShow->db); @@ -57,7 +56,7 @@ int32_t mndRetrieveCompactDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB } } - while(numOfRows < rows){ + while (numOfRows < rows) { pShow->pIter = sdbFetch(pSdb, SDB_COMPACT_DETAIL, pShow->pIter, (void **)&pCompactDetail); if (pShow->pIter == NULL) break; @@ -94,53 +93,60 @@ int32_t mndRetrieveCompactDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB return numOfRows; } -void tFreeCompactDetailObj(SCompactDetailObj *pCompact) { -} +void tFreeCompactDetailObj(SCompactDetailObj *pCompact) {} int32_t tSerializeSCompactDetailObj(void *buf, int32_t bufLen, const SCompactDetailObj *pObj) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - - if (tEncodeI32(&encoder, pObj->compactDetailId) < 0) return -1; - if (tEncodeI32(&encoder, pObj->compactId) < 0) return -1; - if (tEncodeI32(&encoder, pObj->vgId) < 0) return -1; - if (tEncodeI32(&encoder, pObj->dnodeId) < 0) return -1; - if (tEncodeI32(&encoder, pObj->numberFileset) < 0) return -1; - if (tEncodeI32(&encoder, pObj->finished) < 0) return -1; - if (tEncodeI64(&encoder, pObj->startTime) < 0) return -1; - if (tEncodeI32(&encoder, pObj->newNumberFileset) < 0) return -1; - if (tEncodeI32(&encoder, pObj->newFinished) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->compactDetailId)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->compactId)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgId)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->dnodeId)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->numberFileset)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->finished)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->newNumberFileset)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->newFinished)); tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeSCompactDetailObj(void *buf, int32_t bufLen, SCompactDetailObj *pObj) { - int8_t ex = 0; + int32_t code = 0; + int32_t lino; SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); - TAOS_CHECK_RETURN(tStartDecode(&decoder)); - - TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->compactDetailId)); - TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->compactId)); - TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->vgId)); - TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->dnodeId)); - TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->numberFileset)); - TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->finished)); - TAOS_CHECK_RETURN(tDecodeI64(&decoder, &pObj->startTime)); - TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->newNumberFileset)); - TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->newFinished)); + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->compactDetailId)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->compactId)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgId)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->dnodeId)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->numberFileset)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->finished)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->newNumberFileset)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->newFinished)); tEndDecode(&decoder); +_exit: tDecoderClear(&decoder); - return 0; + return code; } SSdbRaw *mndCompactDetailActionEncode(SCompactDetailObj *pCompact) { @@ -148,7 +154,7 @@ SSdbRaw *mndCompactDetailActionEncode(SCompactDetailObj *pCompact) { int32_t lino = 0; terrno = TSDB_CODE_SUCCESS; - void *buf = NULL; + void *buf = NULL; SSdbRaw *pRaw = NULL; int32_t tlen = tSerializeSCompactDetailObj(NULL, 0, pCompact); @@ -156,8 +162,8 @@ SSdbRaw *mndCompactDetailActionEncode(SCompactDetailObj *pCompact) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto OVER; } - - int32_t size = sizeof(int32_t) + tlen; + + int32_t size = sizeof(int32_t) + tlen; pRaw = sdbAllocRaw(SDB_COMPACT_DETAIL, MND_COMPACT_VER_NUMBER, size); if (pRaw == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -181,7 +187,6 @@ SSdbRaw *mndCompactDetailActionEncode(SCompactDetailObj *pCompact) { SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER); SDB_SET_DATALEN(pRaw, dataPos, OVER); - OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { @@ -263,9 +268,8 @@ int32_t mndCompactDetailActionDelete(SSdb *pSdb, SCompactDetailObj *pCompact) { } int32_t mndCompactDetailActionUpdate(SSdb *pSdb, SCompactDetailObj *pOldCompact, SCompactDetailObj *pNewCompact) { - mTrace("compact detail:%" PRId32 ", perform update action, old row:%p new row:%p", - pOldCompact->compactId, pOldCompact, pNewCompact); - + mTrace("compact detail:%" PRId32 ", perform update action, old row:%p new row:%p", pOldCompact->compactId, + pOldCompact, pNewCompact); pOldCompact->numberFileset = pNewCompact->numberFileset; pOldCompact->finished = pNewCompact->finished; @@ -273,8 +277,8 @@ int32_t mndCompactDetailActionUpdate(SSdb *pSdb, SCompactDetailObj *pOldCompact, return 0; } -int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompact, SVgObj *pVgroup, - SVnodeGid *pVgid, int32_t index){ +int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SVgObj *pVgroup, + SVnodeGid *pVgid, int32_t index) { int32_t code = 0; SCompactDetailObj compactDetail = {0}; compactDetail.compactDetailId = index; @@ -287,8 +291,8 @@ int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* p compactDetail.newNumberFileset = -1; compactDetail.newFinished = -1; - mInfo("compact:%d, add compact detail to trans, index:%d, vgId:%d, dnodeId:%d", - compactDetail.compactId, compactDetail.compactDetailId, compactDetail.vgId, compactDetail.dnodeId); + mInfo("compact:%d, add compact detail to trans, index:%d, vgId:%d, dnodeId:%d", compactDetail.compactId, + compactDetail.compactDetailId, compactDetail.vgId, compactDetail.dnodeId); SSdbRaw *pVgRaw = mndCompactDetailActionEncode(&compactDetail); if (pVgRaw == NULL) return -1; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index db7766a7bb..60da4e3799 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -16,64 +16,79 @@ #include "tq.h" int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1; - if (tEncodeI8(pEncoder, pHandle->fetchMeta) < 0) return -1; - if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1; - if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1; - if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1; - if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->subKey)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->fetchMeta)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->consumerId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->snapshotVer)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pHandle->epoch)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pHandle->execHandle.subType)); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg)); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid); - if (tEncodeI32(pEncoder, size) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size)); void* pIter = NULL; pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter); while (pIter) { int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL); - if (tEncodeI64(pEncoder, *tbUid) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, *tbUid)); pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter); } } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid)); if (pHandle->execHandle.execTb.qmsg != NULL) { - if (tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg)); } } tEndEncode(pEncoder); - return pEncoder->pos; +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } } int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1; - if (tDecodeI8(pDecoder, &pHandle->fetchMeta) < 0) return -1; - if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1; - if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1; - if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1; - if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pHandle->subKey)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->fetchMeta)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->consumerId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->snapshotVer)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pHandle->epoch)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pHandle->execHandle.subType)); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg)); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (pHandle->execHandle.execDb.pFilterOutTbUid == NULL) return -1; + if (pHandle->execHandle.execDb.pFilterOutTbUid == NULL) { + TAOS_CHECK_EXIT(terrno); + } int32_t size = 0; - if (tDecodeI32(pDecoder, &size) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size)); for (int32_t i = 0; i < size; i++) { int64_t tbUid = 0; - if (tDecodeI64(pDecoder, &tbUid) < 0) return -1; - if (taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0) != 0) return -1; + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &tbUid)); + TAOS_CHECK_EXIT(taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0)); } } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid)); if (!tDecodeIsEnd(pDecoder)) { - if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg)); } } tEndDecode(pDecoder); - return 0; + +_exit: + return code; } int32_t tqMetaDecodeCheckInfo(STqCheckInfo* info, void* pVal, int32_t vLen) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 30249fd300..fe6cb8317c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -6437,63 +6437,74 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDistInfo* pInfo) { SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeU32(&encoder, pInfo->rowSize)); - if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1; - if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1; - if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeU16(&encoder, pInfo->numOfFiles)); + TAOS_CHECK_EXIT(tEncodeU32(&encoder, pInfo->numOfBlocks)); + TAOS_CHECK_EXIT(tEncodeU32(&encoder, pInfo->numOfTables)); - if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1; - if (tEncodeU64(&encoder, pInfo->totalRows) < 0) return -1; - if (tEncodeI32(&encoder, pInfo->maxRows) < 0) return -1; - if (tEncodeI32(&encoder, pInfo->minRows) < 0) return -1; - if (tEncodeI32(&encoder, pInfo->defMaxRows) < 0) return -1; - if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1; - if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1; - if (tEncodeU32(&encoder, pInfo->numOfSttRows) < 0) return -1; - if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeU64(&encoder, pInfo->totalSize)); + TAOS_CHECK_EXIT(tEncodeU64(&encoder, pInfo->totalRows)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->maxRows)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->minRows)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->defMaxRows)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->defMinRows)); + TAOS_CHECK_EXIT(tEncodeU32(&encoder, pInfo->numOfInmemRows)); + TAOS_CHECK_EXIT(tEncodeU32(&encoder, pInfo->numOfSttRows)); + TAOS_CHECK_EXIT(tEncodeU32(&encoder, pInfo->numOfVgroups)); for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) { - if (tEncodeI32(&encoder, pInfo->blockRowsHisto[i]) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pInfo->blockRowsHisto[i])); } tEndEncode(&encoder); - int32_t tlen = encoder.pos; +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } tEncoderClear(&encoder); return tlen; } int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo* pInfo) { SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pInfo->rowSize)); - if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1; - if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1; - if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeU16(&decoder, &pInfo->numOfFiles)); + TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pInfo->numOfBlocks)); + TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pInfo->numOfTables)); - if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1; - if (tDecodeU64(&decoder, &pInfo->totalRows) < 0) return -1; - if (tDecodeI32(&decoder, &pInfo->maxRows) < 0) return -1; - if (tDecodeI32(&decoder, &pInfo->minRows) < 0) return -1; - if (tDecodeI32(&decoder, &pInfo->defMaxRows) < 0) return -1; - if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1; - if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1; - if (tDecodeU32(&decoder, &pInfo->numOfSttRows) < 0) return -1; - if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pInfo->totalSize)); + TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pInfo->totalRows)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->maxRows)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->minRows)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->defMaxRows)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->defMinRows)); + TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pInfo->numOfInmemRows)); + TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pInfo->numOfSttRows)); + TAOS_CHECK_EXIT(tDecodeU32(&decoder, &pInfo->numOfVgroups)); for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) { - if (tDecodeI32(&decoder, &pInfo->blockRowsHisto[i]) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pInfo->blockRowsHisto[i])); } +_exit: tDecoderClear(&decoder); - return 0; + return code; } int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 8105614d76..1c512888e7 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -19,281 +19,347 @@ #include "streamInt.h" int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) { - if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1; - if (tEncodeI64(pEncoder, pInfo->stage) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId)); + TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage)); return 0; } int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) { - if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1; - if (tDecodeI64(pDecoder, &pInfo->stage) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId)); + TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage)); return 0; } int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; - if (tEncodeI8(pEncoder, pReq->mndTrigger) < 0) return -1; + TAOS_CHECK_RETURN(tStartEncode(pEncoder)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->checkpointId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->nodeId)); + TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pReq->mgmtEps)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->mnodeId)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->expireTime)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->transId)); + TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pReq->mndTrigger)); tEndEncode(pEncoder); return pEncoder->pos; } int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; - if (tDecodeI8(pDecoder, &pReq->mndTrigger) < 0) return -1; + TAOS_CHECK_RETURN(tStartDecode(pDecoder)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->checkpointId)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->nodeId)); + TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pReq->mgmtEps)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->mnodeId)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->expireTime)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->transId)); + TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pReq->mndTrigger)); tEndDecode(pDecoder); return 0; } int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->nodeId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1; - if (tEncodeI8(pEncoder, pRsp->success) < 0) return -1; + TAOS_CHECK_RETURN(tStartEncode(pEncoder)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->streamId)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->checkpointId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->taskId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->nodeId)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->expireTime)); + TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRsp->success)); tEndEncode(pEncoder); return pEncoder->pos; } int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId)); int32_t size = taosArrayGetSize(pMsg->pNodeList); - if (tEncodeI32(pEncoder, size) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size)); for (int32_t i = 0; i < size; ++i) { SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i); if (pInfo == NULL) { - return terrno; + TAOS_CHECK_EXIT(terrno); } - if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp)); } // todo this new attribute will be result in being incompatible with previous version - if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId)); tEndEncode(pEncoder); - return pEncoder->pos; +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } } int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) { int32_t code = 0; + int32_t lino; - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId)); int32_t size = 0; - if (tDecodeI32(pDecoder, &size) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size)); pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo)); + if (pMsg->pNodeList == NULL) { + TAOS_CHECK_EXIT(terrno); + } for (int32_t i = 0; i < size; ++i) { SNodeUpdateInfo info = {0}; - if (tDecodeI32(pDecoder, &info.nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &info.prevEp) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &info.newEp) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp)); - void* p = taosArrayPush(pMsg->pNodeList, &info); - if (p == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + if (taosArrayPush(pMsg->pNodeList, &info) == NULL) { + TAOS_CHECK_EXIT(terrno); } } - if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId)); tEndDecode(pDecoder); +_exit: return code; } int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage)); tEndEncode(pEncoder); - return pEncoder->pos; + +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } } int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage)); tEndDecode(pDecoder); - return 0; + +_exit: + return code; } int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1; - if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status)); tEndEncode(pEncoder); - return pEncoder->pos; + +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } } int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status)); tEndDecode(pDecoder); - return 0; + +_exit: + return code; } int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId)); tEndEncode(pEncoder); - return pEncoder->pos; + +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } } int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId)); tEndDecode(pDecoder); - return 0; + +_exit: + return code; } int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen)); if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) { stError("invalid dispatch req msg"); - return TSDB_CODE_INVALID_MSG; + TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); } for (int32_t i = 0; i < pReq->blockNum; i++) { int32_t* pLen = taosArrayGet(pReq->dataLen, i); void* data = taosArrayGetP(pReq->data, i); if (data == NULL || pLen == NULL) { - return terrno; + TAOS_CHECK_EXIT(terrno); } - if (tEncodeI32(pEncoder, *pLen) < 0) return -1; - if (tEncodeBinary(pEncoder, data, *pLen) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen)); + TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen)); } tEndEncode(pEncoder); - return pEncoder->pos; +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } } int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamRelTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1; + int32_t code = 0; + int32_t lino; - pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*)); - pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t)); + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen)); + + if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) { + TAOS_CHECK_EXIT(terrno); + } for (int32_t i = 0; i < pReq->blockNum; i++) { int32_t len1; uint64_t len2; void* data; - if (tDecodeI32(pDecoder, &len1) < 0) return -1; - if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1)); + TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2)); if (len1 != len2) { - return TSDB_CODE_INVALID_MSG; + TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); } - void* p = taosArrayPush(pReq->dataLen, &len1); - if (p == NULL) { - tEndDecode(pDecoder); - return TSDB_CODE_OUT_OF_MEMORY; + if (taosArrayPush(pReq->dataLen, &len1) == NULL) { + TAOS_CHECK_EXIT(terrno); } - p = taosArrayPush(pReq->data, &data); - if (p == NULL) { - tEndDecode(pDecoder); - return TSDB_CODE_OUT_OF_MEMORY; + if (taosArrayPush(pReq->data, &data) == NULL) { + TAOS_CHECK_EXIT(terrno); } } tEndDecode(pDecoder); - return 0; +_exit: + return code; } void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) { @@ -302,188 +368,220 @@ void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) { } int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1; - if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId)); + TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen)); tEndEncode(pEncoder); - return pEncoder->pos; + +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } } int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId)); uint64_t len = 0; - if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len)); pReq->retrieveLen = (int32_t)len; tEndDecode(pDecoder); - return 0; + +_exit: + return code; } void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); } int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId)); tEndEncode(pEncoder); - return 0; + +_exit: + return code; } int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId)); tEndDecode(pDecoder); - return 0; + +_exit: + return code; } int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->vgId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfTasks)); for (int32_t i = 0; i < pReq->numOfTasks; ++i) { STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i); if (ps == NULL) { - return terrno; + TAOS_CHECK_EXIT(terrno); } - if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1; - if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1; - if (tEncodeI32(pEncoder, ps->status) < 0) return -1; - if (tEncodeI64(pEncoder, ps->stage) < 0) return -1; - if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->procsTotal) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->procsThroughput) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->outputTotal) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->outputThroughput) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1; - if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1; - if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1; - if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1; - if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointInfo.latestSize) < 0) return -1; - if (tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup) < 0) return -1; - if (tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs) < 0) return -1; - if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1; - if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1; - if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->id.streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->id.taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->status)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->stage)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->nodeId)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputQUsed)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputRate)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsTotal)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsThroughput)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputTotal)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputThroughput)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkQuota)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkDataSize)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->processedVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.minVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.maxVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.activeId)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.failed)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestTime)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestSize)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startTime)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->hTaskId)); } int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes); - if (tEncodeI32(pEncoder, numOfVgs) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfVgs)); for (int j = 0; j < numOfVgs; ++j) { int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j); if (pVgId == NULL) { - return terrno; + TAOS_CHECK_EXIT(terrno); } - if (tEncodeI32(pEncoder, *pVgId) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pVgId)); } - if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->ts)); tEndEncode(pEncoder); - return pEncoder->pos; + +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } } int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { int32_t code = 0; + int32_t lino; - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1; + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->vgId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfTasks)); - pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry)); + if ((pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry))) == NULL) { + TAOS_CHECK_EXIT(terrno); + } for (int32_t i = 0; i < pReq->numOfTasks; ++i) { int32_t taskId = 0; STaskStatusEntry entry = {0}; - if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.status) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.procsTotal) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.procsThroughput) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.outputTotal) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.outputThroughput) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1; - if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1; - if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.id.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.status)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.stage)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.nodeId)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputQUsed)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputRate)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsTotal)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsThroughput)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputTotal)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputThroughput)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkQuota)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkDataSize)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.processedVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.minVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.maxVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.activeId)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.failed)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId)); - if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize) < 0) return -1; - if (tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup) < 0) return -1; - if (tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startTime)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.hTaskId)); entry.id.taskId = taskId; - void* p = taosArrayPush(pReq->pTaskStatus, &entry); - if (p == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + if (taosArrayPush(pReq->pTaskStatus, &entry) == NULL) { + TAOS_CHECK_EXIT(terrno); } } int32_t numOfVgs = 0; - if (tDecodeI32(pDecoder, &numOfVgs) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &numOfVgs)); - pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t)); + if ((pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t))) == NULL) { + TAOS_CHECK_EXIT(terrno); + } for (int j = 0; j < numOfVgs; ++j) { int32_t vgId = 0; - if (tDecodeI32(pDecoder, &vgId) < 0) return -1; - void* p = taosArrayPush(pReq->pUpdateNodes, &vgId); - if (p == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId)); + if (taosArrayPush(pReq->pUpdateNodes, &vgId) == NULL) { + TAOS_CHECK_EXIT(terrno); } } - if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->ts)); tEndDecode(pDecoder); - return 0; - _err: - tEndDecode(pDecoder); +_exit: return code; } @@ -508,212 +606,250 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { } int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1; - if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1; + int32_t code = 0; + int32_t lino; - if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1; + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.totalLevel)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type)); + TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType)); - if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus)); - if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset)); - if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1; + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)); int32_t taskId = pTask->hTaskInfo.id.taskId; - if (tEncodeI32(pEncoder, taskId)) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); - if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1; + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId)); taskId = pTask->streamTaskId.taskId; - if (tEncodeI32(pEncoder, taskId)) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); - if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1; - if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1; - if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1; - if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1; + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer)); + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey)); int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); - if (tEncodeI32(pEncoder, epSz) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz)); for (int32_t i = 0; i < epSz; i++) { SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); - if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo)); } if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg)); } if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - if (tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid) < 0) return -1; - if (tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1; - if (tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName)); + TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper)); } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - if (tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId)); } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - if (tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved)); } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet)); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; - if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; + TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); } - if (tEncodeI64(pEncoder, pTask->info.delaySchedParam) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1; - if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1; + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5)); + TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1)); tEndEncode(pEncoder); - return 0; +_exit: + return code; } int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { int32_t taskId = 0; + int32_t code = 0; + int32_t lino; - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; - if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1; + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver)); + if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) { + TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); + } - if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1; - if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.totalLevel)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type)); + TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType)); - if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus)); - if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset)); - if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory)); - if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1; - if (tDecodeI32(pDecoder, &taskId)) return -1; + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); pTask->hTaskInfo.id.taskId = taskId; - if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; - if (tDecodeI32(pDecoder, &taskId)) return -1; + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); pTask->streamTaskId.taskId = taskId; - if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)) return -1; - if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)) return -1; - if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1; - if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1; + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)); + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)); int32_t epSz = -1; - if (tDecodeI32(pDecoder, &epSz) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0); - pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES); + if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) { + TAOS_CHECK_EXIT(terrno); + } for (int32_t i = 0; i < epSz; i++) { SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo)); - if (pInfo == NULL) return -1; - if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) { - taosMemoryFreeClear(pInfo); - return -1; + if (pInfo == NULL) { + TAOS_CHECK_EXIT(terrno); } - void* p = taosArrayPush(pTask->upstreamInfo.pList, &pInfo); - if (p == NULL) { - tEndDecode(pDecoder); - return -1; + if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) { + taosMemoryFreeClear(pInfo); + goto _exit; + } + if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) { + TAOS_CHECK_EXIT(terrno); } } if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg)); } if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - if (tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName)); pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) return -1; - if (tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1; + if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) { + TAOS_CHECK_EXIT(terrno); + } + TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper)); } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - if (tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId)); } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - if (tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved)); } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet)); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; + TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); } - if (tDecodeI64(pDecoder, &pTask->info.delaySchedParam) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam)); if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { - if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5)); } - if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve)); tEndDecode(pDecoder); - return 0; + +_exit: + return code; } int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->checkpointVer) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->checkpointTs) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; - if (tEncodeI8(pEncoder, pReq->dropHTask) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointTs)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->dropHTask)); tEndEncode(pEncoder); - return 0; + +_exit: + return code; } int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->checkpointVer) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->checkpointTs) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; - if (tDecodeI8(pDecoder, &pReq->dropHTask) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointTs)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->dropHTask)); tEndDecode(pDecoder); - return 0; + +_exit: + return code; } -int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; +int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->startTs)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId)); tEndEncode(pEncoder); - return pEncoder->pos; + +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } } int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->startTs)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId)); tEndDecode(pDecoder); - return 0; + +_exit: + return code; }