From 875eabdbf5262e158e5d313fe4e24198285459bd Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 8 Aug 2022 16:18:05 +0800 Subject: [PATCH] fix: send rsma fetch msg to designated vg --- include/common/tmsg.h | 3 -- include/util/taoserror.h | 2 + source/dnode/vnode/src/sma/smaRollup.c | 40 ++++++++++++++++--- source/util/src/terror.c | 2 + .../system-test/1-insert/create_retentions.py | 2 +- 5 files changed, 39 insertions(+), 10 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 716f51933e..b32129bfd7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2658,7 +2658,6 @@ typedef struct { } SVgEpSet; typedef struct { - int64_t refId; int64_t suid; int8_t level; } SRSmaFetchMsg; @@ -2666,7 +2665,6 @@ typedef struct { static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) { if (tStartEncode(pCoder) < 0) return -1; - if (tEncodeI64(pCoder, pReq->refId) < 0) return -1; if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; if (tEncodeI8(pCoder, pReq->level) < 0) return -1; @@ -2677,7 +2675,6 @@ static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFe static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) { if (tStartDecode(pCoder) < 0) return -1; - if (tDecodeI64(pCoder, &pReq->refId) < 0) return -1; if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; if (tDecodeI8(pCoder, &pReq->level) < 0) return -1; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 27fb057b44..eab6e5561f 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -610,6 +610,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152) #define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153) #define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154) +#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155) +#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6b882251f4..662558529d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1389,7 +1389,7 @@ _end: * @return int32_t */ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { - SRSmaFetchMsg fetchMsg = {.refId = pInfo->refId, .suid = pInfo->suid, .level = level}; + SRSmaFetchMsg fetchMsg = { .suid = pInfo->suid, .level = level}; int32_t ret = 0; int32_t contLen = 0; SEncoder encoder = {0}; @@ -1400,13 +1400,17 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { goto _err; } - void *pBuf = rpcMallocCont(contLen); - tEncoderInit(&encoder, pBuf, contLen); + void *pBuf = rpcMallocCont(contLen + sizeof(SMsgHead)); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), contLen); if (tEncodeSRSmaFetchMsg(&encoder, &fetchMsg) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; tEncoderClear(&encoder); } tEncoderClear(&encoder); + + ((SMsgHead *)pBuf)->vgId = SMA_VID(pSma); + ((SMsgHead *)pBuf)->contLen = contLen + sizeof(SMsgHead); + SRpcMsg rpcMsg = { .code = 0, .msgType = TDMT_VND_FETCH_RSMA, @@ -1415,24 +1419,42 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { }; if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, FETCH_QUEUE, &rpcMsg)) != 0) { - smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%d level:%" PRIi8 " since %s", + smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma), pInfo->suid, level, terrstr()); goto _err; } + smaDebug("vgId:%d, success to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), + pInfo->suid, level); + return TSDB_CODE_SUCCESS; _err: return TSDB_CODE_FAILED; } +/** + * @brief fetch rsma data of level 2/3 and submit + * + * @param pSma + * @param pMsg + * @return int32_t + */ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; SRSmaFetchMsg req = {0}; SDecoder decoder = {0}; + void *pBuf = NULL; SRSmaInfo *pInfo = NULL; SRSmaInfoItem *pItem = NULL; - tDecoderInit(&decoder, pRpcMsg->pCont, pRpcMsg->contLen); + if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { + terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; + return -1; + } + + pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead)); + + tDecoderInit(&decoder, pBuf, pRpcMsg->contLen); if (tDecodeSRSmaFetchMsg(&decoder, &req) < 0) { terrno = TSDB_CODE_INVALID_MSG; goto _err; @@ -1440,7 +1462,11 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid); if (!pInfo) { - smaDebug("vgId:%d, failed to process rsma fetch msg since Empty rsma info", SMA_VID(pSma)); + if (terrno == TSDB_CODE_SUCCESS) { + terrno = TSDB_CODE_RSMA_EMPTY_INFO; + } + smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma), + req.suid, req.level, terrstr()); goto _err; } @@ -1459,6 +1485,8 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { tdReleaseRSmaInfo(pSma, pInfo); tDecoderClear(&decoder); + smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid, + req.level); return TSDB_CODE_SUCCESS; _err: tdReleaseRSmaInfo(pSma, pInfo); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 4780d85a30..3c31c893d1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -614,6 +614,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state" TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") //index TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") diff --git a/tests/system-test/1-insert/create_retentions.py b/tests/system-test/1-insert/create_retentions.py index a2c2254820..f8b2ca71b8 100644 --- a/tests/system-test/1-insert/create_retentions.py +++ b/tests/system-test/1-insert/create_retentions.py @@ -187,7 +187,7 @@ class TDTestCase: tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.{stb} tags ( {i+1} )') def __insert_data(self, rows, ctb_num=20, dbname=DBNAME, rsma=False, rsma_type="sum"): - tdLog.printNoPrefix("==========step: start inser data into tables now.....") + tdLog.printNoPrefix("==========step: start insert data into tables now.....") # from ...pytest.util.common import DataSet data = DataSet() data.get_order_set(rows)