diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 536273c044..6f0a7b171e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -278,6 +278,7 @@ int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType); +int32_t tdProcessRSmaDelete(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType); int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 8da2fff5a6..6dcba568f1 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -669,7 +669,7 @@ _exit: */ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid) { - int32_t size = sizeof(int32_t) + sizeof(int64_t) + len; + int32_t size = sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t) + len; // type + len + version + payload void *qItem = taosAllocateQitem(size, DEF_QITEM, 0); if (!qItem) { @@ -678,6 +678,8 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *p void *pItem = qItem; + *(int8_t *)pItem = (int8_t)inputType; + pItem = POINTER_SHIFT(pItem, sizeof(int8_t)); *(int32_t *)pItem = len; pItem = POINTER_SHIFT(pItem, sizeof(int32_t)); *(int64_t *)pItem = version; @@ -852,7 +854,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, return TSDB_CODE_SUCCESS; } - if (inputType == STREAM_INPUT__DATA_SUBMIT) { + if (inputType == STREAM_INPUT__DATA_SUBMIT || inputType == STREAM_INPUT__REF_DATA_BLOCK) { if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) { tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_FAILED; @@ -918,6 +920,25 @@ _err: return TSDB_CODE_FAILED; } +int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) { + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + if (!pEnv) { + // only applicable when rsma env exists + return TSDB_CODE_SUCCESS; + } + + if (inputType == STREAM_INPUT__REF_DATA_BLOCK) { + SDeleteRes *pReq = pReq; + if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, pReq->suid) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); + goto _err; + } + } + return TSDB_CODE_SUCCESS; +_err: + return TSDB_CODE_FAILED; +} + /** * @brief retrieve rsma meta and init * @@ -1203,7 +1224,7 @@ _end: static void tdFreeRSmaSubmitItems(SArray *pItems) { for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { SPackedData *packData = taosArrayGet(pItems, i); - taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int32_t) - sizeof(int64_t))); + taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int8_t) - sizeof(int32_t) - sizeof(int64_t))); } taosArrayClear(pItems); } @@ -1267,33 +1288,81 @@ _err: } static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) { + + void *msg = NULL; + int8_t resume = 0; + int32_t nSubmit = 0; + int32_t nDelete = 0; + + SPackedData packData; + taosArrayClear(pSubmitArr); + while (1) { - void *msg = NULL; taosGetQitem(qall, (void **)&msg); if (msg) { - SPackedData packData = {.msgLen = *(int32_t *)msg, - .ver = *(int64_t *)POINTER_SHIFT(msg, sizeof(int32_t)), - .msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t))}; + int8_t inputType = *(int8_t *)msg; - if (!taosArrayPush(pSubmitArr, &packData)) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; + msg = POINTER_SHIFT(msg, sizeof(int8_t)); + + if (inputType == STREAM_INPUT__DATA_SUBMIT) { + if (nDelete > 0) { + resume = 1; + break; + } + _resume_submit: + packData.msgLen = *(int32_t *)msg; + packData.ver = *(int64_t *)POINTER_SHIFT(msg, sizeof(int32_t)); + packData.msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t)); + if (!taosArrayPush(pSubmitArr, &packData)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + ++nSubmit; + } else if (inputType == STREAM_INPUT__REF_DATA_BLOCK) { + if (nSubmit > 0) { + resume = 2; + break; + } + _resume_delete: + ++nDelete; } + } else { break; } } - int32_t size = taosArrayGetSize(pSubmitArr); - if (size > 0) { - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { - goto _err; + if (nSubmit > 0) { + int32_t size = taosArrayGetSize(pSubmitArr); + if (size > 0) { + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { + goto _err; + } } + tdFreeRSmaSubmitItems(pSubmitArr); } - tdFreeRSmaSubmitItems(pSubmitArr); + } else if (nDelete > 0) { } + + if (resume == 0) { + goto _rtn; + } else if (resume == 1) { + nSubmit = 0; + nDelete = 0; + resume = 0; + taosArrayClear(pSubmitArr); + goto _resume_submit; + } else { + nSubmit = 0; + nDelete = 0; + resume = 0; + taosArrayClear(pSubmitArr); + goto _resume_delete; + } + +_rtn: return TSDB_CODE_SUCCESS; _err: smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid, diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 56d8b1ea45..5b5afbb976 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1884,6 +1884,8 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in if (code) goto _err; } + tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len, STREAM_INPUT__REF_DATA_BLOCK); + tDecoderClear(pCoder); taosArrayDestroy(pRes->uidList); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 60dc6f0185..f73f028758 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -75,6 +75,15 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf taosArrayPush(pInfo->pBlockLists, &tmp); } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; + } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; + SPackedData tmp = { + .pDataBlock = pDataBlock, + }; + taosArrayPush(pInfo->pBlockLists, &tmp); + } + pInfo->blockType = STREAM_INPUT__REF_DATA_BLOCK; } return TSDB_CODE_SUCCESS;