From 6f377e32340b71ad7a59bf7700a00de7ec639356 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 8 Dec 2022 14:39:59 +0800 Subject: [PATCH] refact: tsma/rsma data format --- source/common/src/tdatablock.c | 24 +++++++----------- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 34 +++++++++++++++++--------- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- 4 files changed, 33 insertions(+), 29 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 405b425d7b..84758a08c7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2246,23 +2246,17 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat continue; } - SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData)); - if (!pTbData) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } + SSubmitTbData pTbData = {0}; - if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) { - taosMemoryFree(pTbData); + if (!(pTbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { goto _end; } - pTbData->suid = suid; - pTbData->uid = pDataBlock->info.id.groupId; - pTbData->sver = pTSchema->version; + pTbData.suid = suid; + pTbData.uid = pDataBlock->info.id.groupId; + pTbData.sver = pTSchema->version; if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) { - taosArrayDestroy(pTbData->aRowP); - taosMemoryFree(pTbData); + taosArrayDestroy(pTbData.aRowP); goto _end; } @@ -2360,14 +2354,14 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat } SRow* pRow = NULL; if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) { - tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitTbData(&pTbData, TSDB_MSG_FLG_ENCODE); goto _end; } ASSERT(pRow); - taosArrayPush(pTbData->aRowP, &pRow); + taosArrayPush(pTbData.aRowP, &pRow); } - taosArrayPush(pReq->aSubmitTbData, pTbData); + taosArrayPush(pReq->aSubmitTbData, &pTbData); } _end: taosArrayDestroy(pVals); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index bf028b0e76..5ccf011dfb 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -215,7 +215,7 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); -int32_t tdProcessRSmaSubmit(SSma* pSma, void* pReq, void* pMsg, int32_t len, int32_t inputType); +int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType); int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 7de24467f8..ec3bae7415 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -741,6 +741,7 @@ _err: * @brief Copy msg to rsmaQueueBuffer for batch process * * @param pSma + * @param version * @param pMsg * @param len * @param inputType @@ -748,16 +749,20 @@ _err: * @param suid * @return int32_t */ -static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo, - tb_uid_t suid) { - int32_t size = sizeof(int32_t) + len; +static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, + SRSmaInfo *pInfo, tb_uid_t suid) { + int32_t size = sizeof(int32_t) + sizeof(int64_t) + len; void *qItem = taosAllocateQitem(size, DEF_QITEM); if (!qItem) { return TSDB_CODE_FAILED; } - *(int32_t *)qItem = len; - memcpy(POINTER_SHIFT(qItem, sizeof(int32_t)), pMsg, len); + void *pItem = qItem; + + *(int32_t *)pItem = len; + pItem = POINTER_SHIFT(pItem, sizeof(int32_t)); + *(int64_t *)pItem = version; + memcpy(POINTER_SHIFT(pItem, sizeof(int64_t)), pMsg, len); taosWriteQitem(pInfo->queue, qItem); @@ -999,12 +1004,14 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { * @brief async mode * * @param pSma + * @param version * @param pMsg * @param inputType * @param suid * @return int32_t */ -static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, tb_uid_t suid) { +static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType, + tb_uid_t suid) { SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); if (!pRSmaInfo) { smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); @@ -1012,7 +1019,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t len, int } if (inputType == STREAM_INPUT__DATA_SUBMIT) { - if (tdExecuteRSmaImplAsync(pSma, pMsg, len, inputType, pRSmaInfo, suid) < 0) { + if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) { tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_FAILED; } @@ -1034,7 +1041,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t len, int return TSDB_CODE_SUCCESS; } -int32_t tdProcessRSmaSubmit(SSma *pSma, void *pReq, void *pMsg, int32_t len, int32_t inputType) { +int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); if (!pEnv) { // only applicable when rsma env exists @@ -1054,7 +1061,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pReq, void *pMsg, int32_t len, int } if (uidStore.suid != 0) { - if (tdExecuteRSmaAsync(pSma, pMsg, len, inputType, uidStore.suid) < 0) { + if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, uidStore.suid) < 0) { smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr()); goto _err; } @@ -1062,7 +1069,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pReq, void *pMsg, int32_t len, int void *pIter = NULL; while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - if (tdExecuteRSmaAsync(pSma, pMsg, len, inputType, *pTbSuid) < 0) { + if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, *pTbSuid) < 0) { smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr()); goto _err; } @@ -1370,7 +1377,7 @@ _end: static void tdFreeRSmaSubmitItems(SArray *pItems) { for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { SPackedData *packData = taosArrayGet(pItems, i); - taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int32_t))); + taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int32_t) - sizeof(int64_t))); } taosArrayClear(pItems); } @@ -1439,7 +1446,10 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA void *msg = NULL; taosGetQitem(qall, (void **)&msg); if (msg) { - SPackedData packData = {.msgLen = *(int32_t *)msg, .msgStr = POINTER_SHIFT(msg, sizeof(int32_t))}; + SPackedData packData = {.msgLen = *(int32_t *)msg, + .ver = *(int64_t *)POINTER_SHIFT(msg, sizeof(int32_t)), + .msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t))}; + if (!taosArrayPush(pSubmitArr, &packData)) { tdFreeRSmaSubmitItems(pSubmitArr); goto _err; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f8ef39b516..cbfc16ce4a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1026,7 +1026,7 @@ _exit: atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1); if (code == 0) { atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1); - tdProcessRSmaSubmit(pVnode->pSma, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT); + tdProcessRSmaSubmit(pVnode->pSma, version, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT); } // clear