From a9d30f25d2e46deb304255abcbc98047a53eb060 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 20 Sep 2023 10:11:03 +0800 Subject: [PATCH 01/19] enh: rsma support delete raw data --- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/sma/smaRollup.c | 101 +++++++++++++++++++++---- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 + source/libs/executor/src/executor.c | 9 +++ 4 files changed, 97 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 536273c044..6f0a7b171e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -278,6 +278,7 @@ int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType); +int32_t tdProcessRSmaDelete(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType); int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 8da2fff5a6..6dcba568f1 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -669,7 +669,7 @@ _exit: */ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid) { - int32_t size = sizeof(int32_t) + sizeof(int64_t) + len; + int32_t size = sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t) + len; // type + len + version + payload void *qItem = taosAllocateQitem(size, DEF_QITEM, 0); if (!qItem) { @@ -678,6 +678,8 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *p void *pItem = qItem; + *(int8_t *)pItem = (int8_t)inputType; + pItem = POINTER_SHIFT(pItem, sizeof(int8_t)); *(int32_t *)pItem = len; pItem = POINTER_SHIFT(pItem, sizeof(int32_t)); *(int64_t *)pItem = version; @@ -852,7 +854,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, return TSDB_CODE_SUCCESS; } - if (inputType == STREAM_INPUT__DATA_SUBMIT) { + if (inputType == STREAM_INPUT__DATA_SUBMIT || inputType == STREAM_INPUT__REF_DATA_BLOCK) { if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) { tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_FAILED; @@ -918,6 +920,25 @@ _err: return TSDB_CODE_FAILED; } +int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) { + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + if (!pEnv) { + // only applicable when rsma env exists + return TSDB_CODE_SUCCESS; + } + + if (inputType == STREAM_INPUT__REF_DATA_BLOCK) { + SDeleteRes *pReq = pReq; + if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, pReq->suid) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); + goto _err; + } + } + return TSDB_CODE_SUCCESS; +_err: + return TSDB_CODE_FAILED; +} + /** * @brief retrieve rsma meta and init * @@ -1203,7 +1224,7 @@ _end: static void tdFreeRSmaSubmitItems(SArray *pItems) { for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { SPackedData *packData = taosArrayGet(pItems, i); - taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int32_t) - sizeof(int64_t))); + taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int8_t) - sizeof(int32_t) - sizeof(int64_t))); } taosArrayClear(pItems); } @@ -1267,33 +1288,81 @@ _err: } static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) { + + void *msg = NULL; + int8_t resume = 0; + int32_t nSubmit = 0; + int32_t nDelete = 0; + + SPackedData packData; + taosArrayClear(pSubmitArr); + while (1) { - void *msg = NULL; taosGetQitem(qall, (void **)&msg); if (msg) { - SPackedData packData = {.msgLen = *(int32_t *)msg, - .ver = *(int64_t *)POINTER_SHIFT(msg, sizeof(int32_t)), - .msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t))}; + int8_t inputType = *(int8_t *)msg; - if (!taosArrayPush(pSubmitArr, &packData)) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; + msg = POINTER_SHIFT(msg, sizeof(int8_t)); + + if (inputType == STREAM_INPUT__DATA_SUBMIT) { + if (nDelete > 0) { + resume = 1; + break; + } + _resume_submit: + packData.msgLen = *(int32_t *)msg; + packData.ver = *(int64_t *)POINTER_SHIFT(msg, sizeof(int32_t)); + packData.msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t)); + if (!taosArrayPush(pSubmitArr, &packData)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + ++nSubmit; + } else if (inputType == STREAM_INPUT__REF_DATA_BLOCK) { + if (nSubmit > 0) { + resume = 2; + break; + } + _resume_delete: + ++nDelete; } + } else { break; } } - int32_t size = taosArrayGetSize(pSubmitArr); - if (size > 0) { - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { - goto _err; + if (nSubmit > 0) { + int32_t size = taosArrayGetSize(pSubmitArr); + if (size > 0) { + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { + goto _err; + } } + tdFreeRSmaSubmitItems(pSubmitArr); } - tdFreeRSmaSubmitItems(pSubmitArr); + } else if (nDelete > 0) { } + + if (resume == 0) { + goto _rtn; + } else if (resume == 1) { + nSubmit = 0; + nDelete = 0; + resume = 0; + taosArrayClear(pSubmitArr); + goto _resume_submit; + } else { + nSubmit = 0; + nDelete = 0; + resume = 0; + taosArrayClear(pSubmitArr); + goto _resume_delete; + } + +_rtn: return TSDB_CODE_SUCCESS; _err: smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid, diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 56d8b1ea45..5b5afbb976 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1884,6 +1884,8 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in if (code) goto _err; } + tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len, STREAM_INPUT__REF_DATA_BLOCK); + tDecoderClear(pCoder); taosArrayDestroy(pRes->uidList); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 60dc6f0185..f73f028758 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -75,6 +75,15 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf taosArrayPush(pInfo->pBlockLists, &tmp); } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; + } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; + SPackedData tmp = { + .pDataBlock = pDataBlock, + }; + taosArrayPush(pInfo->pBlockLists, &tmp); + } + pInfo->blockType = STREAM_INPUT__REF_DATA_BLOCK; } return TSDB_CODE_SUCCESS; From aa3ba74f6e1591c42f81e1a8316194815a4c9c56 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 9 Nov 2023 15:03:14 +0800 Subject: [PATCH 02/19] chore: code optimization --- source/common/src/tglobal.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d441b22aa3..b4224e7364 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -605,7 +605,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { 0) return -1; - tsNumOfVnodeRsmaThreads = tsNumOfCores; + tsNumOfVnodeRsmaThreads = tsNumOfCores / 4; tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 22c9cc6260..519f0dde4f 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -21,6 +21,7 @@ #define RSMA_FETCH_DELAY_MAX (120000) // ms #define RSMA_FETCH_ACTIVE_MAX (1000) // ms #define RSMA_FETCH_INTERVAL (5000) // ms +#define RSMA_SUBMIT_HEAD_LEN (13) // type(int8_t) + len(int32_t) + version(int64_t) #define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel) @@ -667,7 +668,7 @@ _exit: */ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid) { - int32_t size = sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t) + len; // type + len + version + payload + int32_t size = RSMA_SUBMIT_HEAD_LEN + len; // header(type+len+version) + payload void *qItem = taosAllocateQitem(size, DEF_QITEM, 0); if (!qItem) { @@ -1222,7 +1223,7 @@ _end: static void tdFreeRSmaSubmitItems(SArray *pItems) { for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { SPackedData *packData = taosArrayGet(pItems, i); - taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int8_t) - sizeof(int32_t) - sizeof(int64_t))); + taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_SUBMIT_HEAD_LEN)); } taosArrayClear(pItems); } From 572eb691fb5afd2974389869621d400ce8b54fc9 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 9 Nov 2023 19:51:01 +0800 Subject: [PATCH 03/19] enh: support delete msg for rsma --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/sma/smaCommit.c | 5 +- source/dnode/vnode/src/sma/smaRollup.c | 183 +++++++++++++------------ source/dnode/vnode/src/tq/tqRead.c | 2 +- source/dnode/vnode/src/tq/tqUtil.c | 21 +-- source/dnode/vnode/src/vnd/vnodeSvr.c | 6 +- source/libs/executor/src/executor.c | 9 +- 8 files changed, 121 insertions(+), 111 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 675bfa334a..238407b26c 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -160,7 +160,7 @@ int32_t tqResetStreamTaskStatus(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); // tq util -int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock); +int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int8_t type); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 137c2f4f7e..df1720d4a7 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -286,8 +286,8 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); -int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType); -int32_t tdProcessRSmaDelete(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType); +int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len); +int32_t tdProcessRSmaDelete(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len); int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 92b8c09fbc..3512f1476f 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -217,10 +217,7 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) { int32_t lino = 0; SVnode *pVnode = pSma->pVnode; - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - if (!pSmaEnv) { - goto _exit; - } + if (!SMA_RSMA_ENV(pSma)) goto _exit; code = tsdbCommitBegin(VND_RSMA1(pVnode), pInfo); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index dd21ec2b30..6742f30d53 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -43,7 +43,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz ERsmaExecType type, int8_t level); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); -static void tdFreeRSmaSubmitItems(SArray *pItems); +static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type); static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, int32_t execType, int8_t *streamFlushed); @@ -723,7 +723,7 @@ _exit: */ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid) { - int32_t size = RSMA_SUBMIT_HEAD_LEN + len; // header(type+len+version) + payload + int32_t size = RSMA_SUBMIT_HEAD_LEN + len; // header + payload void *qItem = taosAllocateQitem(size, DEF_QITEM, 0); if (!qItem) { @@ -940,12 +940,8 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, return TSDB_CODE_SUCCESS; } -int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) { - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - if (!pEnv) { - // only applicable when rsma env exists - return TDB_CODE_SUCCESS; - } +int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) { + if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS; if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) { smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), terrstr()); @@ -954,27 +950,25 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, STbUidStore uidStore = {0}; - if (inputType == STREAM_INPUT__DATA_SUBMIT) { - if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) { - smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr()); + if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) { + smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr()); + goto _err; + } + + if (uidStore.suid != 0) { + if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); goto _err; } - if (uidStore.suid != 0) { - if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, uidStore.suid) < 0) { - smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); + void *pIter = NULL; + while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) { + tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr()); + taosHashCancelIterate(uidStore.uidHash, pIter); goto _err; } - - void *pIter = NULL; - while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) { - tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, *pTbSuid) < 0) { - smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr()); - taosHashCancelIterate(uidStore.uidHash, pIter); - goto _err; - } - } } } tdUidStoreDestory(&uidStore); @@ -984,19 +978,18 @@ _err: return terrno; } -int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) { - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - if (!pEnv) { - // only applicable when rsma env exists - return TSDB_CODE_SUCCESS; +int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) { + if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS; + + if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) { + smaError("vgId:%d, failed to process rsma delete since invalid exec code: %s", SMA_VID(pSma), terrstr()); + goto _err; } - if (inputType == STREAM_INPUT__REF_DATA_BLOCK) { - SDeleteRes *pReq = pReq; - if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, pReq->suid) < 0) { - smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); - goto _err; - } + SDeleteRes *pDelRes = pReq; + if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid) < 0) { + smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); + goto _err; } return TSDB_CODE_SUCCESS; _err: @@ -1381,10 +1374,20 @@ _end: tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId); } -static void tdFreeRSmaSubmitItems(SArray *pItems) { - for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { - SPackedData *packData = taosArrayGet(pItems, i); - taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_SUBMIT_HEAD_LEN)); +static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) { + int32_t arrSize = taosArrayGetSize(pItems); + if (type == STREAM_INPUT__MERGED_SUBMIT) { + for (int32_t i = 0; i < arrSize; ++i) { + SPackedData *packData = TARRAY_GET_ELEM(pItems, i); + taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_SUBMIT_HEAD_LEN)); + } + } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { + for (int32_t i = 0; i < arrSize; ++i) { + SPackedData *packData = TARRAY_GET_ELEM(pItems, i); + blockDataDestroy(packData->pDataBlock); + } + } else { + ASSERTS(0, "unknown type:%d", type); } taosArrayClear(pItems); } @@ -1448,34 +1451,37 @@ _err: return TSDB_CODE_FAILED; } +#define RSMA_SUBMIT_MSG_TYPE(msg) (*(int8_t *)(msg)) +#define RSMA_SUBMIT_MSG_LEN(msg) (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t))) +#define RSMA_SUBMIT_MSG_VER(msg) (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t))) +#define RSMA_SUBMIT_MSG_BODY(msg) (POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t))) + static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) { - - void *msg = NULL; - int8_t resume = 0; - int32_t nSubmit = 0; - int32_t nDelete = 0; + void *msg = NULL; + int8_t resume = 0; + int32_t nSubmit = 0; + int32_t nDelete = 0; SPackedData packData; taosArrayClear(pSubmitArr); + // the submitReq/deleteReq msg may exsit alternately in the msg queue, consume them sequentially in batch mode while (1) { taosGetQitem(qall, (void **)&msg); if (msg) { - int8_t inputType = *(int8_t *)msg; - - msg = POINTER_SHIFT(msg, sizeof(int8_t)); - + int8_t inputType = RSMA_SUBMIT_MSG_TYPE(msg); if (inputType == STREAM_INPUT__DATA_SUBMIT) { if (nDelete > 0) { resume = 1; break; } _resume_submit: - packData.msgLen = *(int32_t *)msg; - packData.ver = *(int64_t *)POINTER_SHIFT(msg, sizeof(int32_t)); - packData.msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t)); + packData.msgLen = RSMA_SUBMIT_MSG_LEN(msg); + packData.ver = RSMA_SUBMIT_MSG_VER(msg); + packData.msgStr = RSMA_SUBMIT_MSG_BODY(msg); if (!taosArrayPush(pSubmitArr, &packData)) { + taosFreeQitem(msg); terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -1486,46 +1492,49 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA break; } _resume_delete: - ++nDelete; -#if 0 - if (!taosArrayPush(pSubmitArr, &packData)) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; - } -#endif - } else { - break; - } - } - - if (nSubmit > 0) { - int32_t size = taosArrayGetSize(pSubmitArr); - if (size > 0) { - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { + extractDelDataBlock(RSMA_SUBMIT_MSG_BODY(msg), RSMA_SUBMIT_MSG_LEN(msg), RSMA_SUBMIT_MSG_VER(msg), + &packData.pDataBlock, 1); + if (!taosArrayPush(pSubmitArr, &packData)) { + taosFreeQitem(msg); + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + taosFreeQitem(msg); + ++nDelete; + } else { + ASSERTS(0, "unknown msg type:%d", inputType); + break; } - tdFreeRSmaSubmitItems(pSubmitArr); } - } else if (nDelete > 0) { - } - if (resume == 0) { - goto _rtn; - } else if (resume == 1) { - nSubmit = 0; - nDelete = 0; - resume = 0; - taosArrayClear(pSubmitArr); - goto _resume_submit; - } else { - nSubmit = 0; - nDelete = 0; - resume = 0; - taosArrayClear(pSubmitArr); - goto _resume_delete; + if (nSubmit > 0 || nDelete > 0) { + int32_t size = TARRAY_SIZE(pSubmitArr); + if (size > 0) { + int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK; + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, inputType, pInfo, type, i) < 0) { + goto _err; + } + } + tdFreeRSmaSubmitItems(pSubmitArr, inputType); + } + } + + if (resume == 0) { + goto _rtn; + } else if (resume == 1) { + nSubmit = 0; + nDelete = 0; + resume = 0; + tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__REF_DATA_BLOCK); + goto _resume_submit; + } else { + nSubmit = 0; + nDelete = 0; + resume = 0; + tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__MERGED_SUBMIT); + goto _resume_delete; + } } _rtn: @@ -1534,7 +1543,7 @@ _err: atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno); smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid, type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr()); - tdFreeRSmaSubmitItems(pSubmitArr); + tdFreeRSmaSubmitItems(pSubmitArr, nSubmit ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK); while (1) { void *msg = NULL; taosGetQitem(qall, (void **)&msg); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index bd2a591a98..41b1aa7bd1 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -343,7 +343,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); int32_t len = pCont->bodyLen - sizeof(SMsgHead); - code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); + code = extractDelDataBlock(pBody, len, ver, (void**)pItem, 0); if (code == TSDB_CODE_SUCCESS) { if (*pItem == NULL) { tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index d8fe899bf6..f7d0ad54e9 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -399,7 +399,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* return 0; } -int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) { +int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int8_t type) { SDecoder* pCoder = &(SDecoder){0}; SDeleteRes* pRes = &(SDeleteRes){0}; @@ -442,14 +442,19 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream } taosArrayDestroy(pRes->uidList); - *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); - if (*pRefBlock == NULL) { - blockDataCleanup(pDelBlock); - taosMemoryFree(pDelBlock); - return TSDB_CODE_OUT_OF_MEMORY; + if (type == 0) { + *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); + if (*pRefBlock == NULL) { + blockDataCleanup(pDelBlock); + taosMemoryFree(pDelBlock); + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK; + ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock; + } else { + *pRefBlock = pDelBlock; } - (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK; - (*pRefBlock)->pBlock = pDelBlock; return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 44b6af4ff5..df42e02776 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -380,7 +380,7 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) { SEncoder *pCoder = &(SEncoder){0}; SDeleteRes res = {0}; - SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .skipRollup = 1}; initStorageAPI(&handle.api); code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); @@ -1669,7 +1669,7 @@ _exit: atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1); if (code == 0) { atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1); - code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT); + code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len); } // clear @@ -1947,7 +1947,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in if (code) goto _err; } - tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len, STREAM_INPUT__REF_DATA_BLOCK); + tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len); tDecoderClear(pCoder); taosArrayDestroy(pRes->uidList); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d569f78322..1f82a9477b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -78,12 +78,11 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf taosArrayPush(pInfo->pBlockLists, &tmp); pInfo->blockType = STREAM_INPUT__CHECKPOINT; } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { - for (int32_t i = 0; i < numOfBlocks; ++i) { - SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - SPackedData tmp = {.pDataBlock = pDataBlock}; - taosArrayPush(pInfo->pBlockLists, &tmp); + for (int32_t i = 0; i < numOfBlocks; ++i) { + SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); + taosArrayPush(pInfo->pBlockLists, pReq); } - pInfo->blockType = STREAM_INPUT__REF_DATA_BLOCK; + pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } return TSDB_CODE_SUCCESS; From ff3621f38224cc6e9be85eb6b161d5172a9bb9a6 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 9 Nov 2023 20:09:39 +0800 Subject: [PATCH 04/19] enh: rsma delete logic --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 54 +++++++++++++------------- source/dnode/vnode/src/tq/tqUtil.c | 6 ++- 3 files changed, 33 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 238407b26c..6b4f8d1431 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -160,7 +160,7 @@ int32_t tqResetStreamTaskStatus(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); // tq util -int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int8_t type); +int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6742f30d53..d530133159 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -17,13 +17,17 @@ #include "tq.h" #include "tstream.h" -#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt -#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt -#define RSMA_FETCH_DELAY_MAX (120000) // ms -#define RSMA_FETCH_ACTIVE_MAX (1000) // ms -#define RSMA_FETCH_INTERVAL (5000) // ms -#define RSMA_SUBMIT_HEAD_LEN (13) // type(int8_t) + len(int32_t) + version(int64_t) -#define RSMA_TASK_FLAG "rsma" +#define RSMA_EXEC_SMOOTH_SIZE (100) // cnt +#define RSMA_EXEC_BATCH_SIZE (1024) // cnt +#define RSMA_FETCH_DELAY_MAX (120000) // ms +#define RSMA_FETCH_ACTIVE_MAX (1000) // ms +#define RSMA_FETCH_INTERVAL (5000) // ms +#define RSMA_EXEC_MSG_HLEN (13) // type(int8_t) + len(int32_t) + version(int64_t) +#define RSMA_EXEC_MSG_TYPE(msg) (*(int8_t *)(msg)) +#define RSMA_EXEC_MSG_LEN(msg) (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t))) +#define RSMA_EXEC_MSG_VER(msg) (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t))) +#define RSMA_EXEC_MSG_BODY(msg) (POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t))) +#define RSMA_TASK_FLAG "rsma" #define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel) @@ -723,7 +727,7 @@ _exit: */ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid) { - int32_t size = RSMA_SUBMIT_HEAD_LEN + len; // header + payload + int32_t size = RSMA_EXEC_MSG_HLEN + len; // header + payload void *qItem = taosAllocateQitem(size, DEF_QITEM, 0); if (!qItem) { @@ -752,7 +756,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *p } // smoothing consume - int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE; + int32_t n = nItems / RSMA_EXEC_SMOOTH_SIZE; if (n > 1) { if (n > 10) { n = 10; @@ -1379,7 +1383,7 @@ static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) { if (type == STREAM_INPUT__MERGED_SUBMIT) { for (int32_t i = 0; i < arrSize; ++i) { SPackedData *packData = TARRAY_GET_ELEM(pItems, i); - taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_SUBMIT_HEAD_LEN)); + taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_EXEC_MSG_HLEN)); } } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { for (int32_t i = 0; i < arrSize; ++i) { @@ -1451,11 +1455,6 @@ _err: return TSDB_CODE_FAILED; } -#define RSMA_SUBMIT_MSG_TYPE(msg) (*(int8_t *)(msg)) -#define RSMA_SUBMIT_MSG_LEN(msg) (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t))) -#define RSMA_SUBMIT_MSG_VER(msg) (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t))) -#define RSMA_SUBMIT_MSG_BODY(msg) (POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t))) - static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) { void *msg = NULL; int8_t resume = 0; @@ -1470,16 +1469,16 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA while (1) { taosGetQitem(qall, (void **)&msg); if (msg) { - int8_t inputType = RSMA_SUBMIT_MSG_TYPE(msg); + int8_t inputType = RSMA_EXEC_MSG_TYPE(msg); if (inputType == STREAM_INPUT__DATA_SUBMIT) { if (nDelete > 0) { resume = 1; break; } _resume_submit: - packData.msgLen = RSMA_SUBMIT_MSG_LEN(msg); - packData.ver = RSMA_SUBMIT_MSG_VER(msg); - packData.msgStr = RSMA_SUBMIT_MSG_BODY(msg); + packData.msgLen = RSMA_EXEC_MSG_LEN(msg); + packData.ver = RSMA_EXEC_MSG_VER(msg); + packData.msgStr = RSMA_EXEC_MSG_BODY(msg); if (!taosArrayPush(pSubmitArr, &packData)) { taosFreeQitem(msg); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1492,8 +1491,11 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA break; } _resume_delete: - extractDelDataBlock(RSMA_SUBMIT_MSG_BODY(msg), RSMA_SUBMIT_MSG_LEN(msg), RSMA_SUBMIT_MSG_VER(msg), - &packData.pDataBlock, 1); + if ((terrno = extractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), RSMA_EXEC_MSG_VER(msg), + &packData.pDataBlock, 1))) { + taosFreeQitem(msg); + goto _err; + } if (!taosArrayPush(pSubmitArr, &packData)) { taosFreeQitem(msg); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1518,17 +1520,17 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA } tdFreeRSmaSubmitItems(pSubmitArr, inputType); } + } else { + goto _rtn; } - if (resume == 0) { - goto _rtn; - } else if (resume == 1) { + if (resume == 1) { nSubmit = 0; nDelete = 0; resume = 0; tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__REF_DATA_BLOCK); goto _resume_submit; - } else { + } else if (resume == 2) { nSubmit = 0; nDelete = 0; resume = 0; @@ -1580,7 +1582,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { } if (!(pSubmitArr = - taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) { + taosArrayInit(TMIN(RSMA_EXEC_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index f7d0ad54e9..8f62928d22 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -399,7 +399,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* return 0; } -int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int8_t type) { +int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) { SDecoder* pCoder = &(SDecoder){0}; SDeleteRes* pRes = &(SDeleteRes){0}; @@ -452,8 +452,10 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK; ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock; - } else { + } else if (type == 1) { *pRefBlock = pDelBlock; + } else { + ASSERTS(0, "unknown type:%d", type); } return TSDB_CODE_SUCCESS; From 7288b0fc770dffa888ec50833df1929f0c0d17c9 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 9 Nov 2023 20:16:29 +0800 Subject: [PATCH 05/19] enh: rsma delete error code --- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index d530133159..e1cada4665 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -997,7 +997,7 @@ int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, } return TSDB_CODE_SUCCESS; _err: - return TSDB_CODE_FAILED; + return terrno; } /** diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index df42e02776..fbd59203dc 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1947,7 +1947,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in if (code) goto _err; } - tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len); + code = tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len); tDecoderClear(pCoder); taosArrayDestroy(pRes->uidList); From ee6e68a71e70cc84434d046f3e20bdc9f6d8d548 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 07:02:46 +0800 Subject: [PATCH 06/19] enh: rsma support delete --- source/common/src/tdatablock.c | 4 ++ source/dnode/vnode/src/sma/smaRollup.c | 78 +++++++++++++++++++++----- source/dnode/vnode/src/vnd/vnodeSvr.c | 5 ++ 3 files changed, 73 insertions(+), 14 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index bf21b2eda0..4b777dac82 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1978,6 +1978,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; + if(pDataBlock->info.type == STREAM_DELETE_RESULT) { + + } + if (colNum <= 1) { // invalid if only with TS col continue; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index e1cada4665..d0ef36d17f 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -629,6 +629,46 @@ _end: return code; } +static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatchDeleteReq *pDelReq) { + int32_t code = 0; + int32_t lino = 0; + + if (taosArrayGetSize(pDelReq->deleteReqs) > 0) { + int32_t len = 0; + tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code); + TSDB_CHECK_CODE(code, lino, _exit); + + void *pBuf = rpcMallocCont(len + sizeof(SMsgHead)); + if (!pBuf) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + + SEncoder encoder; + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len); + tEncodeSBatchDeleteReq(&encoder, pDelReq); + tEncoderClear(&encoder); + + ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode); + + SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, + .pCont = pBuf, + .contLen = len + sizeof(SMsgHead), + .info.wrapper = level == 1 ? VND_RSMA1(pSma->pVnode) : VND_RSMA2(pSma->pVnode)}; + code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + taosArrayDestroy(pDelReq->deleteReqs); + if (code) { + smaError("vgId:%d, failed at line %d to process delete req for table:%" PRIi64 ", level:%" PRIi8 " since %s", + SMA_VID(pSma), lino, suid, level, tstrerror(code)); + } + + return code; +} + static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, int32_t execType, int8_t *streamFlushed) { int32_t code = 0; @@ -657,10 +697,25 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma if (output->info.type == STREAM_CHECKPOINT) { if (streamFlushed) *streamFlushed = 1; continue; + } else if (output->info.type == STREAM_DELETE_RESULT) { + SBatchDeleteReq *pDeleteReq = NULL; + pDeleteReq->suid = suid; + pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); + if (!pDeleteReq->deleteReqs) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + code = tqBuildDeleteReq("", output, pDeleteReq, ""); + TSDB_CHECK_CODE(code, lino, _exit); + code = tdRSmaProcessDelReq(pSma, suid, pItem->level, pDeleteReq); + TSDB_CHECK_CODE(code, lino, _exit); + continue; } + smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma), output->info.id.uid, output->info.id.groupId, output->info.rows); + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); SSubmitReq2 *pReq = NULL; @@ -669,7 +724,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma TSDB_CHECK_CODE(code, lino, _exit); } - // reset the output version to handle reboot + // reset the output version when reboot if (STREAM_GET_ALL == execType && output->info.version == 0) { // the submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously output->info.version = pItem->submitReqVer; @@ -1511,30 +1566,25 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA if (nSubmit > 0 || nDelete > 0) { int32_t size = TARRAY_SIZE(pSubmitArr); - if (size > 0) { - int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK; - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, inputType, pInfo, type, i) < 0) { - goto _err; - } + ASSERTS(size > 0, "size is %d", size); + int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK; + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, inputType, pInfo, type, i) < 0) { + goto _err; } - tdFreeRSmaSubmitItems(pSubmitArr, inputType); } + tdFreeRSmaSubmitItems(pSubmitArr, inputType); + nSubmit = 0; + nDelete = 0; } else { goto _rtn; } if (resume == 1) { - nSubmit = 0; - nDelete = 0; resume = 0; - tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__REF_DATA_BLOCK); goto _resume_submit; } else if (resume == 2) { - nSubmit = 0; - nDelete = 0; resume = 0; - tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__MERGED_SUBMIT); goto _resume_delete; } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fbd59203dc..0eb6448fca 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -412,6 +412,10 @@ static int32_t vnodePreProcessBatchDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; int32_t lino = 0; + if (pMsg->info.wrapper) { // skip for rsma + return code; + } + int64_t ctimeMs = taosGetTimestampMs(); SBatchDeleteReq pReq = {0}; SDecoder *pCoder = &(SDecoder){0}; @@ -1905,6 +1909,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); } + if() code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs); if (code < 0) { terrno = code; From 674e878ffad5e0d983db5af7742318a301a0fb41 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 14:44:29 +0800 Subject: [PATCH 07/19] enh: rsma support delete --- include/common/tmsg.h | 1 + source/common/src/tdatablock.c | 4 - source/common/src/tmsg.c | 4 + source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 57 ++- source/dnode/vnode/src/sma/smaTimeRange.c | 2 +- source/dnode/vnode/src/tq/tqSink.c | 36 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 29 +- source/libs/executor/src/scanoperator.c | 6 + tests/parallel_test/cases.task | 1 + .../tsim/sma/rsmaCreateInsertQueryDelete.sim | 446 ++++++++++++++++++ tests/script/tsim/testsuit.sim | 1 + tests/script/win-test-file | 1 + 13 files changed, 531 insertions(+), 59 deletions(-) create mode 100644 tests/script/tsim/sma/rsmaCreateInsertQueryDelete.sim diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4ef4273631..86d34502c6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3774,6 +3774,7 @@ typedef struct { int64_t suid; SArray* deleteReqs; // SArray int64_t ctimeMs; // fill by vnode + int8_t level; // 0 tsdb(default), 1 rsma1 , 2 rsma2 } SBatchDeleteReq; int32_t tEncodeSBatchDeleteReq(SEncoder* pCoder, const SBatchDeleteReq* pReq); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 4b777dac82..bf21b2eda0 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1978,10 +1978,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; - if(pDataBlock->info.type == STREAM_DELETE_RESULT) { - - } - if (colNum <= 1) { // invalid if only with TS col continue; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 34b789fef8..dc3ba7934f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -8337,6 +8337,7 @@ int32_t tEncodeSBatchDeleteReq(SEncoder *pEncoder, const SBatchDeleteReq *pReq) if (tEncodeSSingleDeleteReq(pEncoder, pOneReq) < 0) return -1; } if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->level) < 0) return -1; return 0; } @@ -8361,6 +8362,9 @@ int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) { if (!tDecodeIsEnd(pDecoder)) { if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1; } + if (!tDecodeIsEnd(pDecoder)) { + if (tDecodeI8(pDecoder, &pReq->level) < 0) return -1; + } return 0; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 6b4f8d1431..fdd449bf36 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -147,7 +147,7 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) int32_t tqOffsetCommitFile(STqOffsetStore* pStore); // tqSink -int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, +int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr); void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index d0ef36d17f..99f5283df3 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -43,8 +43,8 @@ static void tdUidStoreDestory(STbUidStore *pStore); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd); static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx); -static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, - ERsmaExecType type, int8_t level); +static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType, + SRSmaInfo *pInfo, ERsmaExecType type, int8_t level); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type); @@ -654,7 +654,7 @@ static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatc SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead), - .info.wrapper = level == 1 ? VND_RSMA1(pSma->pVnode) : VND_RSMA2(pSma->pVnode)}; + .info.ahandle = level == 1 ? VND_RSMA1(pSma->pVnode) : VND_RSMA2(pSma->pVnode)}; code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg); TSDB_CHECK_CODE(code, lino, _exit); } @@ -698,16 +698,15 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma if (streamFlushed) *streamFlushed = 1; continue; } else if (output->info.type == STREAM_DELETE_RESULT) { - SBatchDeleteReq *pDeleteReq = NULL; - pDeleteReq->suid = suid; - pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - if (!pDeleteReq->deleteReqs) { + SBatchDeleteReq deleteReq = {.suid = suid, .level = pItem->level}; + deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); + if (!deleteReq.deleteReqs) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } - code = tqBuildDeleteReq("", output, pDeleteReq, ""); + code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, ""); TSDB_CHECK_CODE(code, lino, _exit); - code = tdRSmaProcessDelReq(pSma, suid, pItem->level, pDeleteReq); + code = tdRSmaProcessDelReq(pSma, suid, pItem->level, &deleteReq); TSDB_CHECK_CODE(code, lino, _exit); continue; } @@ -715,6 +714,19 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma), output->info.id.uid, output->info.id.groupId, output->info.rows); + if (STREAM_GET_ALL == execType) { + /** + * 1. reset the output version when reboot + * 2. delete msg version not updated from the result + */ + if (output->info.version < pItem->submitReqVer) { + // submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously + output->info.version = pItem->submitReqVer; + } else if (output->info.version == pItem->fetchResultVer) { + ASSERTS(0, "duplicated fetch version:%" PRIi64, pItem->fetchResultVer); + continue; + } + } STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); SSubmitReq2 *pReq = NULL; @@ -724,12 +736,6 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma TSDB_CHECK_CODE(code, lino, _exit); } - // reset the output version when reboot - if (STREAM_GET_ALL == execType && output->info.version == 0) { - // the submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously - output->info.version = pItem->submitReqVer; - } - if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { if (terrno == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) { // TODO: reconfigure SSubmitReq2 @@ -858,7 +864,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { * @param level * @return int32_t */ -static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, +static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType, SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) { int32_t idx = level - 1; void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); @@ -878,22 +884,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64 " nMsg:%d", SMA_VID(pSma), level, RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize); -#if 0 - for (int32_t i = 0; i < msgSize; ++i) { - SSubmitReq *pReq = *(SSubmitReq **)((char *)pMsg + i * sizeof(void *)); - smaDebug("vgId:%d, [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version); - tdRsmaPrintSubmitReq(pSma, pReq); - } -#endif if ((terrno = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) { smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } - if (STREAM_INPUT__MERGED_SUBMIT == inputType) { - SPackedData *packData = POINTER_SHIFT(pMsg, sizeof(SPackedData) * (msgSize - 1)); - atomic_store_64(&pItem->submitReqVer, packData->ver); - } + atomic_store_64(&pItem->submitReqVer, version); terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL); @@ -1515,6 +1511,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA int8_t resume = 0; int32_t nSubmit = 0; int32_t nDelete = 0; + int64_t version = 0; SPackedData packData; @@ -1534,6 +1531,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA packData.msgLen = RSMA_EXEC_MSG_LEN(msg); packData.ver = RSMA_EXEC_MSG_VER(msg); packData.msgStr = RSMA_EXEC_MSG_BODY(msg); + version = packData.ver; if (!taosArrayPush(pSubmitArr, &packData)) { taosFreeQitem(msg); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1546,7 +1544,8 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA break; } _resume_delete: - if ((terrno = extractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), RSMA_EXEC_MSG_VER(msg), + version = RSMA_EXEC_MSG_VER(msg); + if ((terrno = extractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version, &packData.pDataBlock, 1))) { taosFreeQitem(msg); goto _err; @@ -1569,7 +1568,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA ASSERTS(size > 0, "size is %d", size); int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK; for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, inputType, pInfo, type, i) < 0) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i) < 0) { goto _err; } } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 08ddc4bd7b..94ff5ef6b3 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -188,7 +188,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * if (pDataBlock->info.type == STREAM_DELETE_RESULT) { pDeleteReq->suid = suid; pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - tqBuildDeleteReq(stbFullName, pDataBlock, pDeleteReq, ""); + tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, ""); continue; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 4b64737936..80ce867e70 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -43,7 +43,7 @@ static SArray* createDefaultTagColName(); static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, int64_t gid); -int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, +int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { int32_t totalRows = pDataBlock->info.rows; SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); @@ -53,30 +53,44 @@ int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName); + char tbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE]; for (int32_t row = 0; row < totalRows; row++) { int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row); int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); - char* name; - void* varTbName = NULL; + char* name = NULL; + char* pName = NULL; + void* varTbName = NULL; + tbName[0] = '\0'; if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) { varTbName = colDataGetVarData(pTbNameCol, row); } if (varTbName != NULL && varTbName != (void*)-1) { - name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); - memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); + name = varDataVal(varTbName); + } else if (stbFullName) { + pName = buildCtbNameByGroupId(stbFullName, groupId); + name = pName; } else { - name = buildCtbNameByGroupId(stbFullName, groupId); + metaGetTableNameByUid(pTq->pVnode, groupId, tbName); + name = varDataVal(tbName); } - tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, - pIdStr, groupId, name, skey, ekey); + if (!name || *name == '\0') { + tqError("s-task:%s build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64 + " failed since invalid tbname:%s", + pIdStr, groupId, name, skey, ekey, name ? name : "NULL"); + taosArrayDestroy(deleteReq->deleteReqs); + return -1; + } - SSingleDeleteReq req = { .startTs = skey, .endTs = ekey}; + tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr, groupId, + name, skey, ekey); + + SSingleDeleteReq req = {.startTs = skey, .endTs = ekey}; strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1); - taosMemoryFree(name); + if (pName) taosMemoryFree(pName); taosArrayPush(deleteReq->deleteReqs, &req); } @@ -345,7 +359,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* int64_t suid) { SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; - int32_t code = tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr); + int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0eb6448fca..c219508cd0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -412,10 +412,6 @@ static int32_t vnodePreProcessBatchDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; int32_t lino = 0; - if (pMsg->info.wrapper) { // skip for rsma - return code; - } - int64_t ctimeMs = taosGetTimestampMs(); SBatchDeleteReq pReq = {0}; SDecoder *pCoder = &(SDecoder){0}; @@ -1890,6 +1886,13 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe SMetaReader mr = {0}; metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK); + STsdb *pTsdb = pVnode->pTsdb; + + if (deleteReq.level == 1) { + pTsdb = VND_RSMA1(pVnode); + } else { + pTsdb = VND_RSMA2(pVnode); + } int32_t sz = taosArrayGetSize(deleteReq.deleteReqs); for (int32_t i = 0; i < sz; i++) { @@ -1902,22 +1905,22 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe int64_t uid = mr.me.uid; - int32_t code = tsdbDeleteTableData(pVnode->pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); + int32_t code = tsdbDeleteTableData(pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); if (code < 0) { terrno = code; vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64, TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); } - if() - code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs); - if (code < 0) { - terrno = code; - vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 - ", end ts:%" PRId64, - TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); + if (deleteReq.level == 0) { + code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs); + if (code < 0) { + terrno = code; + vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 + ", end ts:%" PRId64, + TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); + } } - tDecoderClear(&mr.coder); } metaReaderClear(&mr); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c47e14ad0d..c7f0bb3895 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2105,6 +2105,12 @@ FETCH_NEXT_BLOCK: SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); SSDataBlock* pBlock = pPacked->pDataBlock; + + if(!pBlock) { + doClearBufferedBlocks(pInfo); + return NULL; + } + if (pBlock->info.parTbName[0]) { pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName); } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index d482e0fde9..3a1b935ee6 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1180,6 +1180,7 @@ e ,,y,script,./test.sh -f tsim/sma/sma_leak.sim ,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim +,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQueryDelete.sim ,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim ,,y,script,./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim ,,n,script,./test.sh -f tsim/valgrind/checkError1.sim diff --git a/tests/script/tsim/sma/rsmaCreateInsertQueryDelete.sim b/tests/script/tsim/sma/rsmaCreateInsertQueryDelete.sim new file mode 100644 index 0000000000..5f5c840eb6 --- /dev/null +++ b/tests/script/tsim/sma/rsmaCreateInsertQueryDelete.sim @@ -0,0 +1,446 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database with retentions +sql create database d0 retentions -:7d,10s:21d,15s:365d; +sql use d0 + +print =============== create super table and register rsma +sql create table if not exists stb (ts timestamp, c1 float, c2 double) tags (city binary(20),district binary(20)) rollup(sum) max_delay 1s,1s; + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +sql create table ct1 using stb tags("BeiJing", "ChaoYang"); + +sql show tables +if $rows != 1 then + return -1 +endi + +print =============== insert data and trigger rollup +sql insert into ct1 values(now, 10, NULL); +sql insert into ct1 values(now+60m, 1, NULL); +sql insert into ct1 values(now+120m, 100, NULL); + +print =============== wait 5 seconds for results +sleep 5000 + +print =============== select * from retention level 2 from memory +sql select * from ct1; +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +if $rows != 3 then + print retention level 2 file rows $rows != 3 + return -1 +endi + +if $data01 != 10.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi +if $data11 != 1.00000 then + return -1 +endi +if $data12 != NULL then + return -1 +endi +if $data21 != 100.00000 then + return -1 +endi +if $data22 != NULL then + return -1 +endi + +print =============== select * from retention level 1 from memory +sql select * from ct1 where ts > now-8d; +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 + +if $rows != 3 then + print retention level 2 file rows $rows != 3 + return -1 +endi + +if $data01 != 10.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi +if $data11 != 1.00000 then + return -1 +endi +if $data12 != NULL then + return -1 +endi +if $data21 != 100.00000 then + return -1 +endi +if $data22 != NULL then + return -1 +endi + +print =============== select * from retention level 0 from memory +sql select * from ct1 where ts > now-3d; +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 + +if $rows != 3 then + print retention level 2 file rows $rows != 3 + return -1 +endi + +if $data01 != 10.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi +if $data11 != 1.00000 then + return -1 +endi +if $data12 != NULL then + return -1 +endi +if $data21 != 100.00000 then + return -1 +endi +if $data22 != NULL then + return -1 +endi + +print =============== delete row 0 +sql delete from ct1 where ts < now; +sql delete from ct1 where ts < now; +sql delete from ct1 where ts < now; +print =============== wait 5 seconds for results +sleep 5000 + +print =============== select * from retention level 2 from memory after delete row 0 +sql select * from ct1; +print $data00 $data01 $data02 +print $data10 $data11 $data12 + +if $rows != 2 then + print retention level 2 file rows $rows != 2 + return -1 +endi + +if $data01 != 1.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi +if $data11 != 100.00000 then + return -1 +endi +if $data12 != NULL then + return -1 +endi + +print =============== select * from retention level 1 from memory after delete row 0 +sql select * from ct1 where ts > now-8d; +print $data00 $data01 $data02 +print $data10 $data11 $data12 + +if $rows != 2 then + print retention level 2 file rows $rows != 2 + return -1 +endi + +if $data01 != 1.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi +if $data11 != 100.00000 then + return -1 +endi +if $data12 != NULL then + return -1 +endi + +print =============== select * from retention level 0 from memory after delete row 0 +sql select * from ct1 where ts > now-3d; +print $data00 $data01 $data02 +print $data10 $data11 $data12 + +if $rows != 2 then + print retention level 2 file rows $rows != 2 + return -1 +endi + +if $data01 != 1.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi +if $data11 != 100.00000 then + return -1 +endi +if $data12 != NULL then + return -1 +endi + +print =============== delete row 1 +sql delete from ct1 where ts < now; +sql delete from ct1 where ts < now; +sql delete from ct1 where ts < now + 60m; +sql delete from ct1 where ts < now + 60m; +sql delete from ct1 where ts < now + 60m; +sql delete from ct1 where ts < now + 60m; +sql delete from ct1 where ts < now + 60m; + +print =============== wait 5 seconds for results +sleep 5000 + +print =============== select * from retention level 2 from memory after delete row 1 +sql select * from ct1; +print $data00 $data01 $data02 + +if $rows != 1 then + print retention level 2 file rows $rows != 1 + return -1 +endi + +if $data01 != 100.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi + +print =============== select * from retention level 1 from memory after delete row 1 +sql select * from ct1 where ts > now-8d; +print $data00 $data01 $data02 + +if $rows != 1 then + print retention level 2 file rows $rows != 1 + return -1 +endi + +if $data01 != 100.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi + +print =============== select * from retention level 0 from memory after delete row 1 +sql select * from ct1 where ts > now-3d; +print $data00 $data01 $data02 + +if $rows != 1 then + print retention level 2 file rows $rows != 1 + return -1 +endi + +if $data01 != 100.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi + +#=================================================================== +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start +print =============== wait 5 seconds for results after reboot +sleep 5000 + +print =============== select * from retention level 2 from memory after reboot +sql select * from ct1; +print $data00 $data01 $data02 + +if $rows != 1 then + print retention level 2 file rows $rows != 1 + return -1 +endi + +if $data01 != 100.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi + +print =============== select * from retention level 1 from memory after reboot +sql select * from ct1 where ts > now-8d; +print $data00 $data01 $data02 + +if $rows != 1 then + print retention level 2 file rows $rows != 1 + return -1 +endi + +if $data01 != 100.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi + +print =============== select * from retention level 0 from memory after reboot +sql select * from ct1 where ts > now-3d; +print $data00 $data01 $data02 + +if $rows != 1 then + print retention level 2 file rows $rows != 1 + return -1 +endi + +if $data01 != 100.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi + +#==================== flush database to trigger commit data to file +sql flush database d0; +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start + +print =============== select * from retention level 2 from file +sql select * from ct1 where ts > now-365d; +print $data00 $data01 $data02 + +if $rows != 1 then + print retention level 2 file rows $rows != 1 + return -1 +endi + +if $data01 != 100.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi + +print =============== select * from retention level 1 from file +sql select * from ct1 where ts > now-8d; +print $data00 $data01 $data02 + +if $rows != 1 then + print retention level 2 file rows $rows != 1 + return -1 +endi + +if $data01 != 100.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi + +print =============== select * from retention level 0 from file +sql select * from ct1 where ts > now-3d; +print $data00 $data01 $data02 + +if $rows != 1 then + print retention level 2 file rows $rows != 1 + return -1 +endi + +if $data01 != 100.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi + +print =============== delete row 2 +sql delete from ct1 where ts < now; +sql delete from ct1 where ts < now; +sql delete from ct1 where ts < now + 60m; +sql delete from ct1 where ts < now + 60m; +sql delete from ct1 where ts < now + 60m; +sql delete from ct1 where ts < now + 60m; +sql delete from ct1 where ts < now + 60m; +sql delete from ct1 where ts < now + 120m; +sql delete from ct1 where ts < now + 200m; +sql delete from ct1 where ts < now + 300m; +sql delete from ct1 where ts < now + 60m; +sql delete from ct1 where ts < now; + +print =============== wait 5 seconds for results +sleep 5000 + +print =============== select * from retention level 2 from memory after delete row 2 +sql select * from ct1; +print $data00 $data01 $data02 + +if $rows != 0 then + print retention level 2 file rows $rows != 0 + return -1 +endi + +print =============== select * from retention level 1 from memory after delete row 2 +sql select * from ct1 where ts > now-8d; +print $data00 $data01 $data02 + +if $rows != 0 then + print retention level 2 file rows $rows != 0 + return -1 +endi + +print =============== select * from retention level 0 from memory after delete row 2 +sql select * from ct1 where ts > now-3d; +print $data00 $data01 $data02 + +if $rows != 0 then + print retention level 2 file rows $rows != 0 + return -1 +endi + +#=================================================================== +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start +print =============== wait 5 seconds for results after reboot +sleep 5000 + +print =============== select * from retention level 2 from memory after delete row 2 +sql select * from ct1; +print $data00 $data01 $data02 + +if $rows != 0 then + print retention level 2 file rows $rows != 0 + return -1 +endi + +print =============== select * from retention level 1 from memory after delete row 2 +sql select * from ct1 where ts > now-8d; +print $data00 $data01 $data02 + +if $rows != 0 then + print retention level 2 file rows $rows != 0 + return -1 +endi + +print =============== select * from retention level 0 from memory after delete row 2 +sql select * from ct1 where ts > now-3d; +print $data00 $data01 $data02 + +if $rows != 0 then + print retention level 2 file rows $rows != 0 + return -1 +endi + +#=================================================================== +system sh/exec.sh -n dnode1 -s stop -x SIGINT +#=================================================================== \ No newline at end of file diff --git a/tests/script/tsim/testsuit.sim b/tests/script/tsim/testsuit.sim index 0abe56ab3c..c208a07488 100644 --- a/tests/script/tsim/testsuit.sim +++ b/tests/script/tsim/testsuit.sim @@ -130,5 +130,6 @@ run tsim/sync/3Replica1VgElect.sim run tsim/sync/threeReplica1VgElectWihtInsert.sim run tsim/sma/tsmaCreateInsertQuery.sim run tsim/sma/rsmaCreateInsertQuery.sim +run tsim/sma/rsmaCreateInsertQueryDelete.sim run tsim/valgrind/basic.sim run tsim/valgrind/checkError.sim \ No newline at end of file diff --git a/tests/script/win-test-file b/tests/script/win-test-file index fe5f5c39e3..b2d50ade8a 100644 --- a/tests/script/win-test-file +++ b/tests/script/win-test-file @@ -319,6 +319,7 @@ ./test.sh -f tsim/sma/sma_leak.sim ./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim +./test.sh -f tsim/sma/rsmaCreateInsertQueryDelete.sim ./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim ./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim ./test.sh -f tsim/valgrind/checkError1.sim From 5fae10bc760f9758b48baeab1dedd2106327ebcf Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 15:09:01 +0800 Subject: [PATCH 08/19] enh: error code when build delete req --- source/common/src/tdatablock.c | 3 +++ source/common/src/tname.c | 5 ++++- source/dnode/vnode/src/sma/smaTimeRange.c | 3 ++- source/dnode/vnode/src/tq/tqSink.c | 15 ++++++++++----- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index bf21b2eda0..054cff560f 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2121,6 +2121,7 @@ _end: char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); if (!pBuf) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf); @@ -2133,6 +2134,7 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) { if (stbFullName[0] == 0) { + terrno = TSDB_CODE_INVALID_PARA; return TSDB_CODE_FAILED; } @@ -2142,6 +2144,7 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha } if (cname == NULL) { + terrno = TSDB_CODE_INVALID_PARA; taosArrayDestroy(tags); return TSDB_CODE_FAILED; } diff --git a/source/common/src/tname.c b/source/common/src/tname.c index c6210ca8c9..4fe2beb6aa 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -296,7 +296,10 @@ static int compareKv(const void* p1, const void* p2) { void buildChildTableName(RandTableName* rName) { SStringBuilder sb = {0}; taosStringBuilderAppendStringLen(&sb, rName->stbFullName, rName->stbFullNameLen); - if (sb.buf == NULL) return; + if (sb.buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return; + } taosArraySort(rName->tags, compareKv); for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) { taosStringBuilderAppendChar(&sb, ','); diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 94ff5ef6b3..289986e01f 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -188,7 +188,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * if (pDataBlock->info.type == STREAM_DELETE_RESULT) { pDeleteReq->suid = suid; pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, ""); + code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, ""); + TSDB_CHECK_CODE(code, lino, _exit); continue; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 80ce867e70..cf77679478 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -45,6 +45,7 @@ static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSData int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { + int32_t code = 0; int32_t totalRows = pDataBlock->info.rows; SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -73,16 +74,20 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p pName = buildCtbNameByGroupId(stbFullName, groupId); name = pName; } else { - metaGetTableNameByUid(pTq->pVnode, groupId, tbName); - name = varDataVal(tbName); + if (metaGetTableNameByUid(pTq->pVnode, groupId, tbName) == 0) { + name = varDataVal(tbName); + } else { + terrno = TSDB_CODE_OUT_OF_MEMORY; + } } if (!name || *name == '\0') { tqError("s-task:%s build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64 " failed since invalid tbname:%s", - pIdStr, groupId, name, skey, ekey, name ? name : "NULL"); + pIdStr, groupId, skey, ekey, name ? name : "NULL"); taosArrayDestroy(deleteReq->deleteReqs); - return -1; + code = terrno ? terrno : TSDB_CODE_APP_ERROR; + return code; } tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr, groupId, @@ -95,7 +100,7 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p taosArrayPush(deleteReq->deleteReqs, &req); } - return 0; + return code; } static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) { From b1bca4e1e2e89fddfd2023020679d0f69d93ddff Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 15:12:10 +0800 Subject: [PATCH 09/19] fix: batch delete logic --- source/dnode/vnode/src/vnd/vnodeSvr.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d0c0e5f65e..29db7d602f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1893,10 +1893,8 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK); STsdb *pTsdb = pVnode->pTsdb; - if (deleteReq.level == 1) { - pTsdb = VND_RSMA1(pVnode); - } else { - pTsdb = VND_RSMA2(pVnode); + if (deleteReq.level) { + pTsdb = deleteReq.level == 1 ? VND_RSMA1(pVnode) : VND_RSMA2(pVnode); } int32_t sz = taosArrayGetSize(deleteReq.deleteReqs); From d5d8568bad8986252cd3208f6e4ad44e6324344f Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 15:30:49 +0800 Subject: [PATCH 10/19] fix: skip delete msg affects 0 row --- source/dnode/vnode/src/sma/smaRollup.c | 8 ++++++-- source/libs/executor/src/scanoperator.c | 5 ----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 99f5283df3..6d2f76bbe9 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1550,13 +1550,17 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA taosFreeQitem(msg); goto _err; } - if (!taosArrayPush(pSubmitArr, &packData)) { + + if (packData.pDataBlock && !taosArrayPush(pSubmitArr, &packData)) { taosFreeQitem(msg); terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } taosFreeQitem(msg); - ++nDelete; + if (packData.pDataBlock) { + // packData.pDataBlock is NULL if delete affects 0 row + ++nDelete; + } } else { ASSERTS(0, "unknown msg type:%d", inputType); break; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c7f0bb3895..08138e7700 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2106,11 +2106,6 @@ FETCH_NEXT_BLOCK: SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); SSDataBlock* pBlock = pPacked->pDataBlock; - if(!pBlock) { - doClearBufferedBlocks(pInfo); - return NULL; - } - if (pBlock->info.parTbName[0]) { pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName); } From 57f7ff5b9c84b64f1bb7a9ab679934f15e9cd5f9 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 15:34:41 +0800 Subject: [PATCH 11/19] chore: revert the code change --- source/libs/executor/src/scanoperator.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 08138e7700..c47e14ad0d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2105,7 +2105,6 @@ FETCH_NEXT_BLOCK: SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); SSDataBlock* pBlock = pPacked->pDataBlock; - if (pBlock->info.parTbName[0]) { pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName); } From 11c0427b22217c88ef43de5572436ff66397fa26 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 16:41:41 +0800 Subject: [PATCH 12/19] enh: test case to delete mutiple tables --- source/dnode/vnode/src/sma/smaRollup.c | 21 ++-- .../tsim/sma/rsmaCreateInsertQueryDelete.sim | 96 ++++++++++++++++++- 2 files changed, 109 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6d2f76bbe9..72738ec289 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -711,8 +711,10 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma continue; } - smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma), - output->info.id.uid, output->info.id.groupId, output->info.rows); + smaDebug("vgId:%d, result block, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 + ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, + SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid, + pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows); if (STREAM_GET_ALL == execType) { /** @@ -723,7 +725,11 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma // submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously output->info.version = pItem->submitReqVer; } else if (output->info.version == pItem->fetchResultVer) { - ASSERTS(0, "duplicated fetch version:%" PRIi64, pItem->fetchResultVer); + smaWarn("vgId:%d, result block, skip dup version, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64 + ", fetchResultVer:%" PRIi64 ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64 + ", rows:%" PRIi64, + SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid, + pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows); continue; } } @@ -752,8 +758,9 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma atomic_store_64(&pItem->fetchResultVer, output->info.version); } - smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64, - SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version); + smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level:%" PRIi8 + ", execType:%d, ver:%" PRIi64, + SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, execType, output->info.version); if (pReq) { tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); @@ -881,8 +888,8 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, return TSDB_CODE_FAILED; } - smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64 " nMsg:%d", SMA_VID(pSma), level, - RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize); + smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p, suid:%" PRIu64 ", nMsg:%d, submitReqVer:%" PRIi64 ", inputType:%d", SMA_VID(pSma), level, + RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize, version, inputType); if ((terrno = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) { smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); diff --git a/tests/script/tsim/sma/rsmaCreateInsertQueryDelete.sim b/tests/script/tsim/sma/rsmaCreateInsertQueryDelete.sim index 5f5c840eb6..594c062292 100644 --- a/tests/script/tsim/sma/rsmaCreateInsertQueryDelete.sim +++ b/tests/script/tsim/sma/rsmaCreateInsertQueryDelete.sim @@ -5,7 +5,7 @@ sleep 50 sql connect print =============== create database with retentions -sql create database d0 retentions -:7d,10s:21d,15s:365d; +sql create database d0 retentions -:7d,10s:21d,15s:365d vgroups 1; sql use d0 print =============== create super table and register rsma @@ -441,6 +441,100 @@ if $rows != 0 then return -1 endi +print =============== check delete multiple tables +sql create table ct2 using stb tags("BeiJing", "HaiDian"); +sql create table ct3 using stb tags("ShangHai", "PuDong"); + +sql insert into ct2 values(now, 10, NULL); +sql insert into ct2 values(now+60m, 1, NULL); +sql insert into ct2 values(now+120m, 100, NULL); +sql insert into ct3 values(now, 10, NULL); +sql insert into ct3 values(now+60m, 1, NULL); +sql insert into ct3 values(now+120m, 100, NULL); + +print =============== wait 5 seconds for results +sleep 5000 + +print =============== select * from retention level 2 from memory +sql select * from ct2; +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +if $rows != 3 then + print retention level 2 file rows $rows != 3 + return -1 +endi + +if $data01 != 10.00000 then + return -1 +endi +if $data02 != NULL then + return -1 +endi +if $data11 != 1.00000 then + return -1 +endi +if $data12 != NULL then + return -1 +endi +if $data21 != 100.00000 then + return -1 +endi +if $data22 != NULL then + return -1 +endi + +sql delete from ct1 where ts < now + 120m; +sql delete from ct3 where ts < now; +sql delete from ct2 where ts < now + 60m; +sql delete from ct2 where ts < now + 120m; +sql delete from ct3 where ts < now + 60m; +sql delete from ct3 where ts < now + 120m; +sql delete from ct3 where ts < now; + +print =============== wait 5 seconds for results +sleep 5000 + +print =============== select * from retention level 2 from memory after delete ct2 +sql select * from ct2; +print $data00 $data01 $data02 +if $rows != 0 then + print retention level 2 file rows $rows != 0 + return -1 +endi + +print =============== select * from retention level 1 from memory after delete ct3 +sql select * from ct3 where ts > now - 8d; +print $data00 $data01 $data02 +if $rows != 0 then + print retention level 2 file rows $rows != 0 + return -1 +endi + + +#=================================================================== +system sh/exec.sh -n dnode1 -s stop -x SIGINT +#=================================================================== +system sh/exec.sh -n dnode1 -s start +print =============== wait 5 seconds for results after reboot +sleep 5000 + +print =============== select * from retention level 1 from memory after delete ct2 +sql select * from ct2 where ts > now - 8d; +print $data00 $data01 $data02 +if $rows != 0 then + print retention level 2 file rows $rows != 0 + return -1 +endi + +print =============== select * from retention level 2 from memory after delete ct3 +sql select * from ct3 where ts > now - 365d; +print $data00 $data01 $data02 +if $rows != 0 then + print retention level 2 file rows $rows != 0 + return -1 +endi + #=================================================================== system sh/exec.sh -n dnode1 -s stop -x SIGINT #=================================================================== \ No newline at end of file From bc446598267b205e3488cf9cddf4c2acc998855b Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 17:14:57 +0800 Subject: [PATCH 13/19] fix: buffer overflow --- source/dnode/vnode/src/tq/tqSink.c | 31 +++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index cf77679478..3a4b567360 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -54,28 +54,27 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName); - char tbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE]; for (int32_t row = 0; row < totalRows; row++) { int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row); int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); - char* name = NULL; - char* pName = NULL; - void* varTbName = NULL; - tbName[0] = '\0'; + char* name = NULL; + char* originName = NULL; + void* varTbName = NULL; if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) { varTbName = colDataGetVarData(pTbNameCol, row); } if (varTbName != NULL && varTbName != (void*)-1) { - name = varDataVal(varTbName); + name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); } else if (stbFullName) { - pName = buildCtbNameByGroupId(stbFullName, groupId); - name = pName; + name = buildCtbNameByGroupId(stbFullName, groupId); } else { - if (metaGetTableNameByUid(pTq->pVnode, groupId, tbName) == 0) { - name = varDataVal(tbName); + originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE); + if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) { + name = varDataVal(originName); } else { terrno = TSDB_CODE_OUT_OF_MEMORY; } @@ -90,17 +89,19 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p return code; } - tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr, groupId, - name, skey, ekey); + tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, + pIdStr, groupId, name, skey, ekey); - SSingleDeleteReq req = {.startTs = skey, .endTs = ekey}; + SSingleDeleteReq req = { .startTs = skey, .endTs = ekey}; strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1); - if (pName) taosMemoryFree(pName); + + if (originName) name = originName; + taosMemoryFree(name); taosArrayPush(deleteReq->deleteReqs, &req); } - return code; + return 0; } static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) { From c04fb6d543a8fb7c2a610ca7757d54a078073666 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 17:23:17 +0800 Subject: [PATCH 14/19] enh: logic optimization --- source/dnode/vnode/src/tq/tqSink.c | 35 ++++++++++++------------------ 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 3a4b567360..20a72e6a28 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -59,9 +59,9 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); - char* name = NULL; - char* originName = NULL; - void* varTbName = NULL; + char* name = NULL; + char* originName = NULL; + void* varTbName = NULL; if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) { varTbName = colDataGetVarData(pTbNameCol, row); } @@ -75,30 +75,23 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE); if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) { name = varDataVal(originName); - } else { - terrno = TSDB_CODE_OUT_OF_MEMORY; } } if (!name || *name == '\0') { - tqError("s-task:%s build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64 - " failed since invalid tbname:%s", - pIdStr, groupId, skey, ekey, name ? name : "NULL"); - taosArrayDestroy(deleteReq->deleteReqs); - code = terrno ? terrno : TSDB_CODE_APP_ERROR; - return code; + tqWarn("s-task:%s failed to build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64 + " since invalid tbname:%s", + pIdStr, groupId, skey, ekey, name ? name : "NULL"); + } else { + tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr, + groupId, name, skey, ekey); + + SSingleDeleteReq req = {.startTs = skey, .endTs = ekey}; + strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1); + taosArrayPush(deleteReq->deleteReqs, &req); } - - tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, - pIdStr, groupId, name, skey, ekey); - - SSingleDeleteReq req = { .startTs = skey, .endTs = ekey}; - strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1); - if (originName) name = originName; - taosMemoryFree(name); - - taosArrayPush(deleteReq->deleteReqs, &req); + taosMemoryFreeClear(name); } return 0; From f06ea9fcebeed8344ebd1c2fe124c60809b7ccb5 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 17:32:33 +0800 Subject: [PATCH 15/19] enh: wait more time for result --- .../tsim/sma/rsmaCreateInsertQueryDelete.sim | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/tests/script/tsim/sma/rsmaCreateInsertQueryDelete.sim b/tests/script/tsim/sma/rsmaCreateInsertQueryDelete.sim index 594c062292..b93f6f0c44 100644 --- a/tests/script/tsim/sma/rsmaCreateInsertQueryDelete.sim +++ b/tests/script/tsim/sma/rsmaCreateInsertQueryDelete.sim @@ -29,8 +29,8 @@ sql insert into ct1 values(now, 10, NULL); sql insert into ct1 values(now+60m, 1, NULL); sql insert into ct1 values(now+120m, 100, NULL); -print =============== wait 5 seconds for results -sleep 5000 +print =============== wait 7 seconds for results +sleep 7000 print =============== select * from retention level 2 from memory sql select * from ct1; @@ -125,8 +125,8 @@ print =============== delete row 0 sql delete from ct1 where ts < now; sql delete from ct1 where ts < now; sql delete from ct1 where ts < now; -print =============== wait 5 seconds for results -sleep 5000 +print =============== wait 7 seconds for results +sleep 7000 print =============== select * from retention level 2 from memory after delete row 0 sql select * from ct1; @@ -206,8 +206,8 @@ sql delete from ct1 where ts < now + 60m; sql delete from ct1 where ts < now + 60m; sql delete from ct1 where ts < now + 60m; -print =============== wait 5 seconds for results -sleep 5000 +print =============== wait 7 seconds for results +sleep 7000 print =============== select * from retention level 2 from memory after delete row 1 sql select * from ct1; @@ -260,8 +260,8 @@ endi #=================================================================== system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s start -print =============== wait 5 seconds for results after reboot -sleep 5000 +print =============== wait 7 seconds for results after reboot +sleep 7000 print =============== select * from retention level 2 from memory after reboot sql select * from ct1; @@ -378,8 +378,8 @@ sql delete from ct1 where ts < now + 300m; sql delete from ct1 where ts < now + 60m; sql delete from ct1 where ts < now; -print =============== wait 5 seconds for results -sleep 5000 +print =============== wait 7 seconds for results +sleep 7000 print =============== select * from retention level 2 from memory after delete row 2 sql select * from ct1; @@ -411,8 +411,8 @@ endi #=================================================================== system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s start -print =============== wait 5 seconds for results after reboot -sleep 5000 +print =============== wait 7 seconds for results after reboot +sleep 7000 print =============== select * from retention level 2 from memory after delete row 2 sql select * from ct1; @@ -452,8 +452,8 @@ sql insert into ct3 values(now, 10, NULL); sql insert into ct3 values(now+60m, 1, NULL); sql insert into ct3 values(now+120m, 100, NULL); -print =============== wait 5 seconds for results -sleep 5000 +print =============== wait 7 seconds for results +sleep 7000 print =============== select * from retention level 2 from memory sql select * from ct2; @@ -492,8 +492,8 @@ sql delete from ct3 where ts < now + 60m; sql delete from ct3 where ts < now + 120m; sql delete from ct3 where ts < now; -print =============== wait 5 seconds for results -sleep 5000 +print =============== wait 7 seconds for results +sleep 7000 print =============== select * from retention level 2 from memory after delete ct2 sql select * from ct2; @@ -516,8 +516,8 @@ endi system sh/exec.sh -n dnode1 -s stop -x SIGINT #=================================================================== system sh/exec.sh -n dnode1 -s start -print =============== wait 5 seconds for results after reboot -sleep 5000 +print =============== wait 7 seconds for results after reboot +sleep 7000 print =============== select * from retention level 1 from memory after delete ct2 sql select * from ct2 where ts > now - 8d; From 9017e2ed689004c4788e9737636a5c6b7df0a05b Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 18:11:06 +0800 Subject: [PATCH 16/19] chore: code optimization --- source/dnode/vnode/src/sma/smaRollup.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 72738ec289..e813674c56 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -22,12 +22,12 @@ #define RSMA_FETCH_DELAY_MAX (120000) // ms #define RSMA_FETCH_ACTIVE_MAX (1000) // ms #define RSMA_FETCH_INTERVAL (5000) // ms -#define RSMA_EXEC_MSG_HLEN (13) // type(int8_t) + len(int32_t) + version(int64_t) +#define RSMA_EXEC_TASK_FLAG "rsma" +#define RSMA_EXEC_MSG_HLEN (13) // type(int8_t) + len(int32_t) + version(int64_t) #define RSMA_EXEC_MSG_TYPE(msg) (*(int8_t *)(msg)) #define RSMA_EXEC_MSG_LEN(msg) (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t))) #define RSMA_EXEC_MSG_VER(msg) (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t))) -#define RSMA_EXEC_MSG_BODY(msg) (POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t) + sizeof(int64_t))) -#define RSMA_TASK_FLAG "rsma" +#define RSMA_EXEC_MSG_BODY(msg) (POINTER_SHIFT((msg), RSMA_EXEC_MSG_HLEN) #define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel) @@ -293,8 +293,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pStreamTask->id.streamId = pRSmaInfo->suid + idx; pStreamTask->chkInfo.startTs = taosGetTimestampMs(); pStreamTask->pMeta = pVnode->pTq->pStreamMeta; - pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_TASK_FLAG) + 1); - sprintf(pStreamTask->exec.qmsg, "%s", RSMA_TASK_FLAG); + pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_EXEC_TASK_FLAG) + 1); + sprintf(pStreamTask->exec.qmsg, "%s", RSMA_EXEC_TASK_FLAG); pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta); tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id); pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); From 42e4e2b7a4b8c713c1d0a803a2cfc0ec01dd2a09 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 18:27:42 +0800 Subject: [PATCH 17/19] fix: macro define --- source/dnode/vnode/src/sma/smaRollup.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index e813674c56..54a150145e 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -27,7 +27,7 @@ #define RSMA_EXEC_MSG_TYPE(msg) (*(int8_t *)(msg)) #define RSMA_EXEC_MSG_LEN(msg) (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t))) #define RSMA_EXEC_MSG_VER(msg) (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t))) -#define RSMA_EXEC_MSG_BODY(msg) (POINTER_SHIFT((msg), RSMA_EXEC_MSG_HLEN) +#define RSMA_EXEC_MSG_BODY(msg) (POINTER_SHIFT((msg), RSMA_EXEC_MSG_HLEN)) #define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel) From 3707857c1220bdd57257be54f08a4f67b0cf446f Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 10 Nov 2023 18:43:23 +0800 Subject: [PATCH 18/19] chore: remove obsolete code --- source/dnode/vnode/src/sma/smaRollup.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 54a150145e..5dc29509a0 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -653,8 +653,7 @@ static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatc SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, - .contLen = len + sizeof(SMsgHead), - .info.ahandle = level == 1 ? VND_RSMA1(pSma->pVnode) : VND_RSMA2(pSma->pVnode)}; + .contLen = len + sizeof(SMsgHead)}; code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg); TSDB_CHECK_CODE(code, lino, _exit); } From 1c2e9c18d554affacc458a18e26bcf6f3cd573ba Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 10 Nov 2023 22:01:28 +0800 Subject: [PATCH 19/19] Update tqSink.c --- source/dnode/vnode/src/tq/tqSink.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 20a72e6a28..c2e48d5d92 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -45,7 +45,6 @@ static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSData int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { - int32_t code = 0; int32_t totalRows = pDataBlock->info.rows; SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);