diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 7839859e8b..9ca67056c6 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -246,7 +246,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag); // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9e46d4e693..d26e99f423 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2658,7 +2658,6 @@ typedef struct { } SVgEpSet; typedef struct { - int64_t refId; int64_t suid; int8_t level; } SRSmaFetchMsg; @@ -2666,7 +2665,6 @@ typedef struct { static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) { if (tStartEncode(pCoder) < 0) return -1; - if (tEncodeI64(pCoder, pReq->refId) < 0) return -1; if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; if (tEncodeI8(pCoder, pReq->level) < 0) return -1; @@ -2677,7 +2675,6 @@ static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFe static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) { if (tStartDecode(pCoder) < 0) return -1; - if (tDecodeI64(pCoder, &pReq->refId) < 0) return -1; if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; if (tDecodeI8(pCoder, &pReq->level) < 0) return -1; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 20dc04631e..9ddb872007 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -200,6 +200,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) + TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 27fb057b44..eab6e5561f 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -610,6 +610,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152) #define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153) #define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154) +#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155) +#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 302874962e..bf33976c08 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1874,21 +1874,20 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) * @brief TODO: Assume that the final generated result it less than 3M * * @param pReq - * @param pDataBlocks + * @param pDataBlock * @param vgId - * @param suid // TODO: check with Liao whether suid response is reasonable + * @param suid * - * TODO: colId should be set */ -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) { - int32_t sz = taosArrayGetSize(pDataBlocks); int32_t bufSize = sizeof(SSubmitReq); + int32_t sz = 1; for (int32_t i = 0; i < sz; ++i) { - SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info; + const SDataBlockInfo* pBlkInfo = &pDataBlock->info; - int32_t numOfCols = taosArrayGetSize(pDataBlocks); - bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(numOfCols)); + int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); + bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(colNum)); bufSize += sizeof(SSubmitBlk); } @@ -1905,7 +1904,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks tdSRowInit(&rb, pTSchema->version); for (int32_t i = 0; i < sz; ++i) { - SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; // int32_t rowSize = pDataBlock->info.rowSize; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index eca61dd960..1b799b1e5e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -347,6 +347,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 4c6320ecb5..47f7d209b3 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -187,6 +187,7 @@ int32_t smaAsyncPreCommit(SSma* pSma); int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); int32_t smaDoRetention(SSma* pSma, int64_t now); +int32_t smaProcessFetch(SSma *pSma, void* pMsg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index fd2222c5e4..662558529d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -36,19 +36,17 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputT int8_t level); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); - -static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, - SRSmaStat *pStat, int8_t blkType); -static void tdRSmaFetchTrigger(void *param, void *tmrId); - -static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); -static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); -static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter); -static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); - -static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); -static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer); -static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); +static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, + int64_t suid, int8_t blkType); +static void tdRSmaFetchTrigger(void *param, void *tmrId); +static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level); +static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); +static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); +static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter); +static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); +static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); +static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer); +static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) { // adapt accordingly if definition of SRSmaInfo update @@ -604,11 +602,8 @@ _end: return code; } -static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, - SRSmaStat *pStat, int8_t blkType) { - SArray *pResult = NULL; - SSma *pSma = pStat->pSma; - +static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, + int64_t suid, int8_t blkType) { while (1) { SSDataBlock *output = NULL; uint64_t ts; @@ -619,30 +614,20 @@ static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *p pItem->level, terrstr(code)); goto _err; } - if (!output) { - break; - } - if (!pResult) { - pResult = taosArrayInit(1, sizeof(SSDataBlock)); - if (!pResult) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - } - - taosArrayPush(pResult, output); - - if (taosArrayGetSize(pResult) > 0) { -#if 1 + if (output) { +#if 0 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); + SArray *pResult = taosArrayInit(1, sizeof(SSDataBlock)); + taosArrayPush(pResult, output); blockDebugShowDataBlocks(pResult, flag); + taosArrayDestroy(pResult); #endif STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); SSubmitReq *pReq = NULL; // TODO: the schema update should be handled later(TD-17965) - if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) { + if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, pItem->level, terrstr()); goto _err; @@ -659,18 +644,17 @@ static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *p SMA_VID(pSma), suid, pItem->level, output->info.version); taosMemoryFreeClear(pReq); - taosArrayClear(pResult); } else if (terrno == 0) { smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); + break; } else { smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr()); + goto _err; } } - tdDestroySDataBlockArray(pResult); return TSDB_CODE_SUCCESS; _err: - tdDestroySDataBlockArray(pResult); return TSDB_CODE_FAILED; } @@ -694,11 +678,9 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType return TSDB_CODE_FAILED; } - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaFetchAndSubmitResult(RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, pStat, + tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, STREAM_INPUT__DATA_SUBMIT); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); @@ -724,11 +706,13 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { SRSmaInfo *pRSmaInfo = NULL; if (!pEnv) { + terrno = TSDB_CODE_RSMA_INVALID_ENV; return NULL; } pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); if (!pStat || !RSMA_INFO_HASH(pStat)) { + terrno = TSDB_CODE_RSMA_INVALID_STAT; return NULL; } @@ -743,12 +727,12 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return pRSmaInfo; } + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat SRSmaInfo *pCowRSmaInfo = NULL; @@ -779,7 +763,7 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { ASSERT(!pCowRSmaInfo); } - if(pCowRSmaInfo) { + if (pCowRSmaInfo) { tdRefRSmaInfo(pSma, pCowRSmaInfo); } // unlock @@ -1323,7 +1307,7 @@ _err: } /** - * @brief trigger to get rsma result + * @brief trigger to get rsma result in async mode * * @param param * @param tmrId @@ -1357,8 +1341,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { " refId:%d", SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId); if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { - taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle, - &pItem->tmrId); + taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); } return; } @@ -1372,16 +1355,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { case TASK_TRIGGER_STAT_ACTIVE: { smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); - - // sync procedure => async process - - SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; - qTaskInfo_t taskInfo = pRSmaInfo->taskInfo[pItem->level - 1]; - qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK); - tdRSmaFetchAndSubmitResult(taskInfo, pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, - STREAM_INPUT__DATA_BLOCK); - tdCleanupStreamInputDataBlock(taskInfo); - + // async process + tdRSmaFetchSend(pSma, pRSmaInfo, pItem->level); } break; case TASK_TRIGGER_STAT_PAUSED: { smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", @@ -1404,3 +1379,118 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { _end: tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); } + +/** + * @brief put rsma fetch msg to fetch queue + * + * @param pSma + * @param pInfo + * @param level + * @return int32_t + */ +int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { + SRSmaFetchMsg fetchMsg = { .suid = pInfo->suid, .level = level}; + int32_t ret = 0; + int32_t contLen = 0; + SEncoder encoder = {0}; + tEncodeSize(tEncodeSRSmaFetchMsg, &fetchMsg, contLen, ret); + if (ret < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tEncoderClear(&encoder); + goto _err; + } + + void *pBuf = rpcMallocCont(contLen + sizeof(SMsgHead)); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), contLen); + if (tEncodeSRSmaFetchMsg(&encoder, &fetchMsg) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tEncoderClear(&encoder); + } + tEncoderClear(&encoder); + + ((SMsgHead *)pBuf)->vgId = SMA_VID(pSma); + ((SMsgHead *)pBuf)->contLen = contLen + sizeof(SMsgHead); + + SRpcMsg rpcMsg = { + .code = 0, + .msgType = TDMT_VND_FETCH_RSMA, + .pCont = pBuf, + .contLen = contLen, + }; + + if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, FETCH_QUEUE, &rpcMsg)) != 0) { + smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s", + SMA_VID(pSma), pInfo->suid, level, terrstr()); + goto _err; + } + + smaDebug("vgId:%d, success to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), + pInfo->suid, level); + + return TSDB_CODE_SUCCESS; +_err: + return TSDB_CODE_FAILED; +} + +/** + * @brief fetch rsma data of level 2/3 and submit + * + * @param pSma + * @param pMsg + * @return int32_t + */ +int32_t smaProcessFetch(SSma *pSma, void *pMsg) { + SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; + SRSmaFetchMsg req = {0}; + SDecoder decoder = {0}; + void *pBuf = NULL; + SRSmaInfo *pInfo = NULL; + SRSmaInfoItem *pItem = NULL; + + if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { + terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; + return -1; + } + + pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead)); + + tDecoderInit(&decoder, pBuf, pRpcMsg->contLen); + if (tDecodeSRSmaFetchMsg(&decoder, &req) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _err; + } + + pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid); + if (!pInfo) { + if (terrno == TSDB_CODE_SUCCESS) { + terrno = TSDB_CODE_RSMA_EMPTY_INFO; + } + smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma), + req.suid, req.level, terrstr()); + goto _err; + } + + pItem = RSMA_INFO_ITEM(pInfo, req.level - 1); + + SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; + qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, req.level - 1); + if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { + goto _err; + } + if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, STREAM_INPUT__DATA_BLOCK) < 0) { + goto _err; + } + + tdCleanupStreamInputDataBlock(taskInfo); + + tdReleaseRSmaInfo(pSma, pInfo); + tDecoderClear(&decoder); + smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid, + req.level); + return TSDB_CODE_SUCCESS; +_err: + tdReleaseRSmaInfo(pSma, pInfo); + tDecoderClear(&decoder); + smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); + return TSDB_CODE_FAILED; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 15cf183b2a..0f8ec07016 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -325,6 +325,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); + case TDMT_VND_FETCH_RSMA: + return smaProcessFetch(pVnode->pSma, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 4780d85a30..3c31c893d1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -614,6 +614,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state" TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") //index TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") diff --git a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim index bde56cb862..86bdbdcded 100644 --- a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim @@ -29,8 +29,8 @@ sql insert into ct1 values(now, 10); sql insert into ct1 values(now+1s, 1); sql insert into ct1 values(now+2s, 100); -print =============== wait maxdelay 15+1 seconds for results -sleep 16000 +print =============== wait maxdelay 15+2 seconds for results +sleep 17000 print =============== select * from retention level 2 from memory sql select * from ct1; diff --git a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim index 1b54e5a47d..405d22ebdd 100644 --- a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim +++ b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim @@ -29,8 +29,8 @@ sql insert into ct1 values(now, 10, 10.0); sql insert into ct1 values(now+1s, 1, 1.0); sql insert into ct1 values(now+2s, 100, 100.0); -print =============== wait maxdelay 5+1 seconds for results -sleep 6000 +print =============== wait maxdelay 5+2 seconds for results +sleep 7000 print =============== select * from retention level 2 from memory sql select * from ct1; @@ -135,8 +135,8 @@ print =============== insert after rsma qtaskinfo recovery sql insert into ct1 values(now, 50, 500.0); sql insert into ct1 values(now+1s, 40, 40.0); -print =============== wait maxdelay 5+1 seconds for results -sleep 6000 +print =============== wait maxdelay 5+2 seconds for results +sleep 7000 print =============== select * from retention level 2 from file and memory after rsma qtaskinfo recovery sql select * from ct1; diff --git a/tests/system-test/1-insert/create_retentions.py b/tests/system-test/1-insert/create_retentions.py index 96f9ea51ef..24c0dfc046 100644 --- a/tests/system-test/1-insert/create_retentions.py +++ b/tests/system-test/1-insert/create_retentions.py @@ -187,7 +187,7 @@ class TDTestCase: tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.{stb} tags ( {i+1} )') def __insert_data(self, rows, ctb_num=20, dbname=DBNAME, rsma=False, rsma_type="sum"): - tdLog.printNoPrefix("==========step: start inser data into tables now.....") + tdLog.printNoPrefix("==========step: start insert data into tables now.....") # from ...pytest.util.common import DataSet data = DataSet() data.get_order_set(rows)