fix: send rsma fetch msg to designated vg

This commit is contained in:
Cary Xu 2022-08-08 16:18:05 +08:00
parent 16ccf2ad20
commit 875eabdbf5
5 changed files with 39 additions and 10 deletions

View File

@ -2658,7 +2658,6 @@ typedef struct {
} SVgEpSet;
typedef struct {
int64_t refId;
int64_t suid;
int8_t level;
} SRSmaFetchMsg;
@ -2666,7 +2665,6 @@ typedef struct {
static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->refId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeI8(pCoder, pReq->level) < 0) return -1;
@ -2677,7 +2675,6 @@ static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFe
static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->refId) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->level) < 0) return -1;

View File

@ -610,6 +610,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
#define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155)
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)

View File

@ -1389,7 +1389,7 @@ _end:
* @return int32_t
*/
int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
SRSmaFetchMsg fetchMsg = {.refId = pInfo->refId, .suid = pInfo->suid, .level = level};
SRSmaFetchMsg fetchMsg = { .suid = pInfo->suid, .level = level};
int32_t ret = 0;
int32_t contLen = 0;
SEncoder encoder = {0};
@ -1400,13 +1400,17 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
goto _err;
}
void *pBuf = rpcMallocCont(contLen);
tEncoderInit(&encoder, pBuf, contLen);
void *pBuf = rpcMallocCont(contLen + sizeof(SMsgHead));
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), contLen);
if (tEncodeSRSmaFetchMsg(&encoder, &fetchMsg) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tEncoderClear(&encoder);
}
tEncoderClear(&encoder);
((SMsgHead *)pBuf)->vgId = SMA_VID(pSma);
((SMsgHead *)pBuf)->contLen = contLen + sizeof(SMsgHead);
SRpcMsg rpcMsg = {
.code = 0,
.msgType = TDMT_VND_FETCH_RSMA,
@ -1415,24 +1419,42 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
};
if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, FETCH_QUEUE, &rpcMsg)) != 0) {
smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%d level:%" PRIi8 " since %s",
smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s",
SMA_VID(pSma), pInfo->suid, level, terrstr());
goto _err;
}
smaDebug("vgId:%d, success to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma),
pInfo->suid, level);
return TSDB_CODE_SUCCESS;
_err:
return TSDB_CODE_FAILED;
}
/**
* @brief fetch rsma data of level 2/3 and submit
*
* @param pSma
* @param pMsg
* @return int32_t
*/
int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
SRSmaFetchMsg req = {0};
SDecoder decoder = {0};
void *pBuf = NULL;
SRSmaInfo *pInfo = NULL;
SRSmaInfoItem *pItem = NULL;
tDecoderInit(&decoder, pRpcMsg->pCont, pRpcMsg->contLen);
if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) {
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
return -1;
}
pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead));
tDecoderInit(&decoder, pBuf, pRpcMsg->contLen);
if (tDecodeSRSmaFetchMsg(&decoder, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _err;
@ -1440,7 +1462,11 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid);
if (!pInfo) {
smaDebug("vgId:%d, failed to process rsma fetch msg since Empty rsma info", SMA_VID(pSma));
if (terrno == TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_RSMA_EMPTY_INFO;
}
smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma),
req.suid, req.level, terrstr());
goto _err;
}
@ -1459,6 +1485,8 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
tdReleaseRSmaInfo(pSma, pInfo);
tDecoderClear(&decoder);
smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid,
req.level);
return TSDB_CODE_SUCCESS;
_err:
tdReleaseRSmaInfo(pSma, pInfo);

View File

@ -614,6 +614,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state"
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
//index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")

View File

@ -187,7 +187,7 @@ class TDTestCase:
tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.{stb} tags ( {i+1} )')
def __insert_data(self, rows, ctb_num=20, dbname=DBNAME, rsma=False, rsma_type="sum"):
tdLog.printNoPrefix("==========step: start inser data into tables now.....")
tdLog.printNoPrefix("==========step: start insert data into tables now.....")
# from ...pytest.util.common import DataSet
data = DataSet()
data.get_order_set(rows)